今どきのアプリに必須な「複数のソースからのデータ取得」の実装

少し古いですが、ふと思い出した記事です。
ネットワークを使ったすべてのアプリに必須な考え方だと思います。

Loading data from multiple sources with RxJava

ネットワーク経由で問い合わせするデータがある場合、必要なときにそのまま取得することができますが、ディスクやメモリにキャッシュするほうが効率的です。

1. たまにネットワーク経由で新鮮なデータを取得する。
2. それ以外は、その結果をキャッシュしてできるだけ早く取得する。

RxJava を使ってこの実装をすると良いです。

 

基本となる流れ

Observable を使って、ネットワーク、ディスク、メモリーのそれぞれから取得します。

シンプルに2つのオペレーター concat() と first() を使います。

concat() は、複数の Observable を並び順に合成して、first() はその並び順から最初のものを実行します。よって、concat().first() とすると、複数のソースから最初のものが取得されます。


// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;
Observable<Data> disk = ...;
Observable<Data> network = ...;

// Retrieve the first source with data
Observable<Data> source = Observable
  .concat(memory, disk, network)
  .first();

このパターンの鍵となるのは、concat() が 必要なときにだけ、それぞれの子Observableを subscribe することです。

first() はシーケンスを早く止めるので、データがキャッシュされている場合に不要な遅いソースの問い合わせはありません。 言い換えれば、メモリから結果が返された場合、ディスクやネットワークにアクセスする必要はありません。 逆に、メモリとディスクがデータを持っていない場合は、新しくネットワークリクエストを行います。

 

データの保存

当然、次のステップは、ソースが入ってきたときにそれを保存することです。ネットワーク要求の結果をディスクに保存したり、ディスクへのリクエストをメモリにキャッシュしなければ、何の意味もありません。上記のコードは、常にネットワークリスエストを行うだけです。

私の考えた対策は、それぞれのソースがそれを発行するときにデータを保存/キャッシュすることです。


Observable<Data> networkWithSave = network.doOnNext(data -> {
  saveToDisk(data);
  cacheInMemory(data);
});

Observable<Data> diskWithCache = disk.doOnNext(data -> {
  cacheInMemory(data);
});

これで、networkWithSaveとdiskWithCacheを使用した場合、データはロード時に自動的に保存されます。

(この方法のもう1つの利点は、networkWithSave / diskWithCache はどこでも使用できることです。)

 

古いデータ

このままでは、古いままのデータが常に返されます。新鮮なデータの取得のために、たまにはサーバから取得しなければなりません。

解決策は、first() で。これはフィルタリングも実行できます。価値のないデータを除去するように設定します。


Observable<Data> source = Observable
  .concat(memory, diskWithCache, networkWithSave)
  .first(data -> data.isUpToDate());

これで、「最新」と見なされる最初のアイテムのみを発行します。したがって、そのデータソースが古い場合は、新しいデータが見つかるまで次のソースに進みます。

 

first() の代わりに takeFirst()

first() の代わりに takeFirst() を使うこともできます。

2つの違いは、いずれのソースも有効なデータを発行しない場合、first()は NoSuchElementException をスローするのに対し、takeFirst() は例外なしで完了することです。

どちらを使用するかは、データ不足を明示的に処理する必要があるかどうかによって異なります。

dlew/rxjava-multiple-sources-sample: Sample code demonstrating loading multiple data sources via RxJava


すばやく理解する「Room x RxJava 」

いい記事があったので。

Room 🔗 RxJava – Google Developers – Medium

まずは、Room で Dao.


@Query(“SELECT * FROM Users WHERE id = :userId”)
User getUserById(String userId);

ここまでで問題なのは、

1. 同期呼び出しでブロッキング。
2. データ変更時に再度呼び出す必要がある。

ということで、RxJava を使いたくなります。

Room は RxJava2.x に対応しています。

Adding Components to your Project | Android Developers

どのように使うのか?

Maybe


@Query(“SELECT * FROM Users WHERE id = :userId”)
Maybe<User> getUserById(String userId);

1. 該当ユーザがなければ、何も返さずに complete。
2. 該当ユーザがあれば、onSuccess となり complete。
3. Maybe が complete されたあとにユーザー情報が更新されても何もしない。

Single


@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);

1. 該当ユーザがなければ、何も返さず onError(EmptyResultException)。
2. 該当ユーザがあれば、onSuccess。
3. Single が complete されたあとにユーザー情報が更新されても何もしない。

Flowable


@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);

1. 該当ユーザはなければ、何も返さず emit もされない。当然、onNext も onError も呼ばれない。
2. ユーザが存在すれば、onNext。
3. ユーザ情報が更新されるたびに、自動で emit されるので、UI上を最新データに更新させることが可能になる。

 

まとめ

これだけ数行でデータベース、非同期処理を簡潔明快に説明できる Room x RxJava の組み合わせ。

おまけに Observable から細分化された RxJava2.x の主役たちの使い方も理解することができます。

素晴らしいですよね。


よくある Observable の入れ子ができない頭の硬さ乙

99

なぜか長ったらしくなる Observable のネスト.


Observable.just("yo")
    .flatMap(s -> Observable.range(0, 100))
    .subscribe(integer -> Ln.d("Here's an Integer(%s), but how do I access that juicy String?", integer));

Passing composite data through flatMap and similar operators without creating container objects · Issue #2931 · ReactiveX/RxJava

よくあるくせに綺麗に書けない.

そんなわたし乙.

lambda も絡めてゆっくりと眺める.

元.


Observable.just("yo")
    .flatMap(new Func1<String, Observable<? extends Integer>>() {
      @Override public Observable<? extends Integer> call(String s) {
        return Observable.range(0, 100);
      }
    })
    .map(new Func1<Integer, String>() {
      @Override public String call(Integer integer) {
        return String.format("Here's an Integer(%s), but how do I access that juicy String?", integer);
      }
    })
    .subscribe(new Action1<String>() {
      @Override public void call(String x) {
        System.out.println(x);
      }
    });

flatMap(Func1, Func2) でやる場合.


Observable.just("yo")
    .flatMap(new Func1<String, Observable<? extends Integer>>() {
      @Override public Observable<? extends Integer> call(String s) {
        return Observable.range(0, 100);
      }
    }, new Func2<String, Integer, String>() {
      @Override public String call(String s, Integer integer) {
        return String.format("Here's an Integer(%s), with String(%s)", integer, s);
      }
    })
    .subscribe(new Action1<String>() {
      @Override public void call(String x) {
        System.out.println(x);
      }
    });

flatmap に map を入れ子でやる場合.


Observable.just("yo")
    .flatMap(new Func1<String, Observable<? extends String>>() {
      @Override public Observable<? extends String> call(String s) {
        return Observable.range(0, 100)
            .map(new Func1<Integer, String>() {
              @Override public String call(Integer integer) {
                return String.format("Here's an Integer(%s), with String(%s)", integer, s);
              }
            });
      }
    })
    .subscribe(new Action1<String>() {
      @Override public void call(String x) {
        System.out.println(x);
      }
    });
  }

lambda へ

元.


Observable.just("yo")
    .flatMap(s -> Observable.range(0, 100))
    .map(integer -> String.format("Here's an Integer(%s), but how do I access that juicy String?", integer))
    .subscribe(System.out::println);

flatMap(Func1, Func2) でやる場合.


Observable.just("yo")
    .flatMap(s -> Observable.range(0, 100),
        (s, integer) -> String.format("Here's an Integer(%s), with String(%s)", integer, s))
    .subscribe(System.out::println);

flatmap に map を入れ子でやる場合.


Observable.just("yo")
    .flatMap(s -> Observable.range(0, 100)
        .map(integer -> String.format("Here's an Integer(%s), with String(%s)", integer, s)))
    .subscribe(System.out::println);

まとめ

flatmap に map を入れるのがわかりやすいように思います.

慣れたらラムダも直感的です.

Feature request: @Passthru variable for Rx chains · Issue #855 · square/retrofit