今どきのアプリに必須な「複数のソースからのデータ取得」の実装

少し古いですが、ふと思い出した記事です。
ネットワークを使ったすべてのアプリに必須な考え方だと思います。

Loading data from multiple sources with RxJava

ネットワーク経由で問い合わせするデータがある場合、必要なときにそのまま取得することができますが、ディスクやメモリにキャッシュするほうが効率的です。

1. たまにネットワーク経由で新鮮なデータを取得する。
2. それ以外は、その結果をキャッシュしてできるだけ早く取得する。

RxJava を使ってこの実装をすると良いです。

 

基本となる流れ

Observable を使って、ネットワーク、ディスク、メモリーのそれぞれから取得します。

シンプルに2つのオペレーター concat() と first() を使います。

concat() は、複数の Observable を並び順に合成して、first() はその並び順から最初のものを実行します。よって、concat().first() とすると、複数のソースから最初のものが取得されます。


// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;
Observable<Data> disk = ...;
Observable<Data> network = ...;

// Retrieve the first source with data
Observable<Data> source = Observable
  .concat(memory, disk, network)
  .first();

このパターンの鍵となるのは、concat() が 必要なときにだけ、それぞれの子Observableを subscribe することです。

first() はシーケンスを早く止めるので、データがキャッシュされている場合に不要な遅いソースの問い合わせはありません。 言い換えれば、メモリから結果が返された場合、ディスクやネットワークにアクセスする必要はありません。 逆に、メモリとディスクがデータを持っていない場合は、新しくネットワークリクエストを行います。

 

データの保存

当然、次のステップは、ソースが入ってきたときにそれを保存することです。ネットワーク要求の結果をディスクに保存したり、ディスクへのリクエストをメモリにキャッシュしなければ、何の意味もありません。上記のコードは、常にネットワークリスエストを行うだけです。

私の考えた対策は、それぞれのソースがそれを発行するときにデータを保存/キャッシュすることです。


Observable<Data> networkWithSave = network.doOnNext(data -> {
  saveToDisk(data);
  cacheInMemory(data);
});

Observable<Data> diskWithCache = disk.doOnNext(data -> {
  cacheInMemory(data);
});

これで、networkWithSaveとdiskWithCacheを使用した場合、データはロード時に自動的に保存されます。

(この方法のもう1つの利点は、networkWithSave / diskWithCache はどこでも使用できることです。)

 

古いデータ

このままでは、古いままのデータが常に返されます。新鮮なデータの取得のために、たまにはサーバから取得しなければなりません。

解決策は、first() で。これはフィルタリングも実行できます。価値のないデータを除去するように設定します。


Observable<Data> source = Observable
  .concat(memory, diskWithCache, networkWithSave)
  .first(data -> data.isUpToDate());

これで、「最新」と見なされる最初のアイテムのみを発行します。したがって、そのデータソースが古い場合は、新しいデータが見つかるまで次のソースに進みます。

 

first() の代わりに takeFirst()

first() の代わりに takeFirst() を使うこともできます。

2つの違いは、いずれのソースも有効なデータを発行しない場合、first()は NoSuchElementException をスローするのに対し、takeFirst() は例外なしで完了することです。

どちらを使用するかは、データ不足を明示的に処理する必要があるかどうかによって異なります。

dlew/rxjava-multiple-sources-sample: Sample code demonstrating loading multiple data sources via RxJava


すばやく理解する「Room x RxJava 」

いい記事があったので。

Room 🔗 RxJava – Google Developers – Medium

まずは、Room で Dao.


@Query(“SELECT * FROM Users WHERE id = :userId”)
User getUserById(String userId);

ここまでで問題なのは、

1. 同期呼び出しでブロッキング。
2. データ変更時に再度呼び出す必要がある。

ということで、RxJava を使いたくなります。

Room は RxJava2.x に対応しています。

Adding Components to your Project | Android Developers

どのように使うのか?

Maybe


@Query(“SELECT * FROM Users WHERE id = :userId”)
Maybe<User> getUserById(String userId);

1. 該当ユーザがなければ、何も返さずに complete。
2. 該当ユーザがあれば、onSuccess となり complete。
3. Maybe が complete されたあとにユーザー情報が更新されても何もしない。

Single


@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);

1. 該当ユーザがなければ、何も返さず onError(EmptyResultException)。
2. 該当ユーザがあれば、onSuccess。
3. Single が complete されたあとにユーザー情報が更新されても何もしない。

Flowable


@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);

1. 該当ユーザはなければ、何も返さず emit もされない。当然、onNext も onError も呼ばれない。
2. ユーザが存在すれば、onNext。
3. ユーザ情報が更新されるたびに、自動で emit されるので、UI上を最新データに更新させることが可能になる。

 

まとめ

これだけ数行でデータベース、非同期処理を簡潔明快に説明できる Room x RxJava の組み合わせ。

おまけに Observable から細分化された RxJava2.x の主役たちの使い方も理解することができます。

素晴らしいですよね。


よくある Observable の入れ子ができない頭の硬さ乙

99

なぜか長ったらしくなる Observable のネスト.


Observable.just("yo")
    .flatMap(s -> Observable.range(0, 100))
    .subscribe(integer -> Ln.d("Here's an Integer(%s), but how do I access that juicy String?", integer));

Passing composite data through flatMap and similar operators without creating container objects · Issue #2931 · ReactiveX/RxJava

よくあるくせに綺麗に書けない.

そんなわたし乙.

lambda も絡めてゆっくりと眺める.

元.


Observable.just("yo")
    .flatMap(new Func1<String, Observable<? extends Integer>>() {
      @Override public Observable<? extends Integer> call(String s) {
        return Observable.range(0, 100);
      }
    })
    .map(new Func1<Integer, String>() {
      @Override public String call(Integer integer) {
        return String.format("Here's an Integer(%s), but how do I access that juicy String?", integer);
      }
    })
    .subscribe(new Action1<String>() {
      @Override public void call(String x) {
        System.out.println(x);
      }
    });

flatMap(Func1, Func2) でやる場合.


Observable.just("yo")
    .flatMap(new Func1<String, Observable<? extends Integer>>() {
      @Override public Observable<? extends Integer> call(String s) {
        return Observable.range(0, 100);
      }
    }, new Func2<String, Integer, String>() {
      @Override public String call(String s, Integer integer) {
        return String.format("Here's an Integer(%s), with String(%s)", integer, s);
      }
    })
    .subscribe(new Action1<String>() {
      @Override public void call(String x) {
        System.out.println(x);
      }
    });

flatmap に map を入れ子でやる場合.


Observable.just("yo")
    .flatMap(new Func1<String, Observable<? extends String>>() {
      @Override public Observable<? extends String> call(String s) {
        return Observable.range(0, 100)
            .map(new Func1<Integer, String>() {
              @Override public String call(Integer integer) {
                return String.format("Here's an Integer(%s), with String(%s)", integer, s);
              }
            });
      }
    })
    .subscribe(new Action1<String>() {
      @Override public void call(String x) {
        System.out.println(x);
      }
    });
  }

lambda へ

元.


Observable.just("yo")
    .flatMap(s -> Observable.range(0, 100))
    .map(integer -> String.format("Here's an Integer(%s), but how do I access that juicy String?", integer))
    .subscribe(System.out::println);

flatMap(Func1, Func2) でやる場合.


Observable.just("yo")
    .flatMap(s -> Observable.range(0, 100),
        (s, integer) -> String.format("Here's an Integer(%s), with String(%s)", integer, s))
    .subscribe(System.out::println);

flatmap に map を入れ子でやる場合.


Observable.just("yo")
    .flatMap(s -> Observable.range(0, 100)
        .map(integer -> String.format("Here's an Integer(%s), with String(%s)", integer, s)))
    .subscribe(System.out::println);

まとめ

flatmap に map を入れるのがわかりやすいように思います.

慣れたらラムダも直感的です.

Feature request: @Passthru variable for Rx chains · Issue #855 · square/retrofit


リアクティブにデータ更新検知して自動画面更新させる

Square 生まれのライブラリを使います.

SQLBrite

square/sqlbrite: A lightweight wrapper around SQLiteOpenHelper which introduces reactive stream semantics to SQL operations.

SQLiteOpenHelper をラップするように使い, ContentProvider(ContentResolver) のように更新を通知する役割を担当します.

SQLBrite__A_Reactive_Database_Foundation

SQLBrite: A Reactive Database Foundation

RxJava と連携させて使いましょう.


SqlBrite sqlBrite = SqlBrite.create();
BriteDatabase db = sqlBrite.wrapDatabaseHelper(openHelper, Schedulers.io());

// 対象テーブル名を含むSQLクエリーを登録
Observable<Query> users = db.createQuery("users", "SELECT * FROM users");
final AtomicInteger queries = new AtomicInteger();

// データ更新を監視して受け取る
users.subscribe(new Action1<Query>() {
  @Override public void call(Query query) {
    queries.getAndIncrement();
  }
});


System.out.println("Queries: " + queries.get()); // Prints 1

db.insert("users", createUser("jw", "Jake Wharton"));
db.insert("users", createUser("mattp", "Matt Precious"));
db.insert("users", createUser("strong", "Alec Strong"));

System.out.println("Queries: " + queries.get()); // Prints 4

このしくみを利用すると,

CursorLoader + ContentProvider

の部分を

RxJava + SQLBrite

で置き換えてリアクティブに記述できます.

MVP実装の例

googlesamples/android-architecture at dev-todo-mvp-rxjava

Observable を DataSource で作成して,

TasksLocalDataSource.java:77


    @Override
    public Observable<List<Task>> getTasks() {
        String[] projection = {
                TaskEntry.COLUMN_NAME_ENTRY_ID,
                TaskEntry.COLUMN_NAME_TITLE,
                TaskEntry.COLUMN_NAME_DESCRIPTION,
                TaskEntry.COLUMN_NAME_COMPLETED
        };
        String sql = String.format("SELECT %s FROM %s", TextUtils.join(",", projection), TaskEntry.TABLE_NAME);
        return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql)
                .mapToList(mTaskMapperFunction);
    }

Repository 経由させて,

TaskRepository:96


    @Override
    public Observable<List<Task>> getTasks() {
        // ...
        if (mCacheIsDirty) {
            return remoteTasks;
        } else {

            // Query the local storage if available. If not, query the network.
            Observable<List<Task>> localTasks = mTasksLocalDataSource.getTasks();
            return Observable.concat(localTasks, remoteTasks).first();

        }
    }

Presenter 上で subscribe しておけば,

TaskPresenter:104


        Subscription subscription = mTasksRepository
                .getTasks()
                .flatMap(new Func1<List<Task>, Observable<Task>>() {
                    @Override
                    public Observable<Task> call(List<Task> tasks) {
                        return Observable.from(tasks);
                    }
                })
                .filter(new Func1<Task, Boolean>() {
		  // ...
                })
                .toList()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<List<Task>>() {
                    @Override
                    public void onCompleted() {
                        mTasksView.setLoadingIndicator(false);
                    }

                    @Override
                    public void onError(Throwable e) {
                        mTasksView.showLoadingTasksError();
                    }

                    @Override
                    public void onNext(List<Task> tasks) {
                        processTasks(tasks);
                    }
                });

データベースで対象テーブルに更新されるとすぐに自動的に再読み込みが実行されて画面が更新されます.

MVP に CursorLoader を利用すると, Presenter で受け取るコールバックが煩雑になりやすいのでこれを使うと便利です.

DiffUtil で簡単に RecyclerView を更新する


Google発リアクティブなライブラリ「Agera」の評判を眺める

google_agera__Reactive_Programming_for_Android

Agera (Swedish for “to act”) is a super lightweight Android library that helps prepare data for consumption by the Android application components (such as Activities), or objects therein (such as Views), that have life-cycles in one form or another. It introduces a flavor of functional reactive programming, facilitates clear separation of the when, where and what factors of a data processing flow, and enables describing such a complex and asynchronous flow with a single expression, in near natural language.

Home · google/agera Wiki

どんなものなのか良く知らないので掲示板で評判を眺めて雰囲気を確認する.

REDDIT: AGERA - Reactive Programming for Android by Google : androiddev

どんな特徴なのか?

I'm a little surprised they don't mention RxJava at all. At first glance this looks like they wanted to create a simplified version of RxJava. It really doesn't seem to be that Android specific, except for the extensions.

(公式ドキュメントで) RxJavaについて全く言及していないのはのには少し驚きました. 一見, RxJavaの簡易版を作りたかったように見えます. また, エクステンションを除けば, Android に特化しているようにはみえないです.

The library is not very Android-specific tho, and it suffers from the same pitfalls loaders and other Google patterns/libs do: no rotation, poor unsubscribe logic.

まったく Androidに特化していません. Loaderや他のGoogleパターンやライブラリのように同じ落とし穴に悩まされる. ローテション不可, 貧弱な unsubscribe ロジック.

なぜつくったのか

I'd love to hear why they went with an in-house reactive library. A nice long blog post would be great.
Performance? Memory overhead? Android-OS specific concerns?

なぜ自社内でリアクティブライブラリを作成したのか聞きたいです. 素敵な長いブログは素晴らしいです. パフォーマンス? メモリーオーバーヘッド? Androiid OSに特化した何か?

Most likely because Google has a strong internal policy to avoid depending on external libraries, so they were probably forced to develop their own. Obviously, that doesn't make for a very good excuse, so they avoid the whole topic completely and don't even mention Rx a single time in their docs.

なぜなら Googleは外部ライブラリに依存しないという強力な内部ポリシーを持っている. それで, 多分開発を強いられたのでしょう. 明らかに, それは良い理由ではありません. それで, すべての話を完全に避けて, ドキュメント内で一回も言及していません.

I'm not a Googler, but I know of no such policy.
I do know (through decompilation) that applications such as Gmail were using Dagger 1.x as far back as 2-3 years ago. In addition, the Android platform uses many external dependencies, including sqlite and okhttp. So I'm curious where you get this information.

私はGoogleの者ではないが, そのようなポリシーはないと思います.
Gmail は2,3年前 Dagger1.x を利用していたことを (デコンパイルにより) 知っています.
加えて, Androidプラットフォームはsqliteやokhttpを含むたくさんの外部依存を用いています. どこでその情報を仕入れたのか興味がありますね.

The open source licenses are listed in their apps if the open source library requires it. I don't see Dagger listed in the Gmail app but I see it in Inbox, along with a whole slew of open source licenses.

もし, オープンソースライブラリを必要としているなら, オープンソースライセンスがそれらアプリに列挙されます. Dagger は Gmail アプリで列挙されているのをみたことないが, Inbox ではみました. たくさんのオープンソースライセンスの中にありました.

I'm guessing they built this before RxJava was available or widely used and are just now releasing it as open source.

私は, RxJavaが利用可能となり広く利用される前にこれは作成されたものと推測しています. そして, 今回オープンソースとして公開されただけだと.

Googleの人登場!

We don't compare Agera with RxJava because it's not meant to be a replacement for it nor it's our opinion that it should be replaced.

Reactive programming has many flavors and Agera proposes one that we know works well for Android. It was developed internally as part of Google Play Movies and we like to open source code whenever possible.

Question: what's the relation to RxJava? · Issue #20 · google/agera

我々は, Agera と RxJava を比較していません. なぜなら, それに代わるものを意味しているものでもなく, 置き換えるものであるという意見でもありません.

リアクティブプログラミングには多くのフレーバーがあり, Agera はAndroid上で問題なく動くもののひとつとして提案しています. Google Play ムービーの一部として内部で開発されたもので, 我々は, 可能な限りソースコードを公開したい.

今後どうなるのか?

Reading between the lines in some of the answers it seems like they wanted something Android specific.

コメントを読んでいると, Androidに特化した何かを欲しがっているね.

My major concern would be that this was written for Google Play Movies as mentioned in GitHub issue linked above. So it doesn't seem like a first class project at Google so the odds of it being actively maintained seem slim.

上の GitHubリンクで言及されているように. 一番の注目点はGooglePlayMoviesのために書かれたということでしょう. それは, Googleの重要なプロジェクトではないように見えるのでメンテナンスのアクティビィティが細っていく可能性がある.

RxJava から乗り換えたほうがいいのか

Unless there's some major advantage I'd say it's better to stick to the RxJava or RxAndroid from Wharton. I don't want to learn another api for something is already so popular.

大きな利点がないならば, Wharton の RxJava や RxAndroid にとどまったほうが良いといいたい. すでに学んだ有名なものに対して別のAPIを学びたくない.

RxJava, definitely. It's widely used, it's got great maintainers, it isn't Java/Andorid specific, the same pattern is used in a lot of languages.

絶対に RxJava がよいです. 広く使われており, すばらしいメンテナーたちがいます. Java-Android に特化されているわけでなく, たくさんの言語で同じパターンが使われています.

As a person still learning about reactive programming, should I stick with learning RxJava/RxAndroid or give AGERA a try?

リアクティブプログラミングを学んでいるものとして, RxJava/RxAndroid を引き続き勉強するべきか,または, AGERAを試してみるべきか?

If you are new, stick to Rx for now. Have a good grasp on it first. Mastering other reactive tech should be approximating and mapping similar logic from one to another. Those are opinionated APIs, but principles are setup. It is still early to make such claims but reactive knowledge should transfer between language/tech as well as polymorphism does today.

もしあなたが新人であれば, 今は Rx にどどまるべきです. まずそれをしっかり理解しなさい. 他のリアクティブ技術は 似たロジックで構成されています. APIは違うけれども原則は同じです. そのような意見ははまだ早すぎで, 今で言うポリフォーイズムと同じようにリアクティブの知識を持ってから言語や技術間を移動すべきです.

Sticking with RX would give you benefits are:
1. Multi language support. Migrating logic should be a breeze.
2. Great support on the maintainers side.
3. Any company that uses it will want to hire you more.
4. Syntax looks similar enough to other tech like streaming APIs of Java 9.

Rx のメリット:
1. 複数の言語サポート. ロジックの移行が簡単.
2. メンテナー側からの素晴らしいサポート.
3. それを使用するすべての企業はより多くのあなたを雇うことになるでしょう。
4. Java9の ストリーミングAPIのような他の技術に文法がかなり似ている.

今のところは

「乗り換えなくてもいいけど, 試してみる」くらいが妥当なところなのかな.