リアクティブにデータ更新検知して自動画面更新させる

Square 生まれのライブラリを使います.

SQLBrite

square/sqlbrite: A lightweight wrapper around SQLiteOpenHelper which introduces reactive stream semantics to SQL operations.

SQLiteOpenHelper をラップするように使い, ContentProvider(ContentResolver) のように更新を通知する役割を担当します.

SQLBrite__A_Reactive_Database_Foundation

SQLBrite: A Reactive Database Foundation

RxJava と連携させて使いましょう.


SqlBrite sqlBrite = SqlBrite.create();
BriteDatabase db = sqlBrite.wrapDatabaseHelper(openHelper, Schedulers.io());

// 対象テーブル名を含むSQLクエリーを登録
Observable<Query> users = db.createQuery("users", "SELECT * FROM users");
final AtomicInteger queries = new AtomicInteger();

// データ更新を監視して受け取る
users.subscribe(new Action1<Query>() {
  @Override public void call(Query query) {
    queries.getAndIncrement();
  }
});


System.out.println("Queries: " + queries.get()); // Prints 1

db.insert("users", createUser("jw", "Jake Wharton"));
db.insert("users", createUser("mattp", "Matt Precious"));
db.insert("users", createUser("strong", "Alec Strong"));

System.out.println("Queries: " + queries.get()); // Prints 4

このしくみを利用すると,

CursorLoader + ContentProvider

の部分を

RxJava + SQLBrite

で置き換えてリアクティブに記述できます.

MVP実装の例

googlesamples/android-architecture at dev-todo-mvp-rxjava

Observable を DataSource で作成して,

TasksLocalDataSource.java:77


    @Override
    public Observable<List<Task>> getTasks() {
        String[] projection = {
                TaskEntry.COLUMN_NAME_ENTRY_ID,
                TaskEntry.COLUMN_NAME_TITLE,
                TaskEntry.COLUMN_NAME_DESCRIPTION,
                TaskEntry.COLUMN_NAME_COMPLETED
        };
        String sql = String.format("SELECT %s FROM %s", TextUtils.join(",", projection), TaskEntry.TABLE_NAME);
        return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql)
                .mapToList(mTaskMapperFunction);
    }

Repository 経由させて,

TaskRepository:96


    @Override
    public Observable<List<Task>> getTasks() {
        // ...
        if (mCacheIsDirty) {
            return remoteTasks;
        } else {

            // Query the local storage if available. If not, query the network.
            Observable<List<Task>> localTasks = mTasksLocalDataSource.getTasks();
            return Observable.concat(localTasks, remoteTasks).first();

        }
    }

Presenter 上で subscribe しておけば,

TaskPresenter:104


        Subscription subscription = mTasksRepository
                .getTasks()
                .flatMap(new Func1<List<Task>, Observable<Task>>() {
                    @Override
                    public Observable<Task> call(List<Task> tasks) {
                        return Observable.from(tasks);
                    }
                })
                .filter(new Func1<Task, Boolean>() {
		  // ...
                })
                .toList()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<List<Task>>() {
                    @Override
                    public void onCompleted() {
                        mTasksView.setLoadingIndicator(false);
                    }

                    @Override
                    public void onError(Throwable e) {
                        mTasksView.showLoadingTasksError();
                    }

                    @Override
                    public void onNext(List<Task> tasks) {
                        processTasks(tasks);
                    }
                });

データベースで対象テーブルに更新されるとすぐに自動的に再読み込みが実行されて画面が更新されます.

MVP に CursorLoader を利用すると, Presenter で受け取るコールバックが煩雑になりやすいのでこれを使うと便利です.

DiffUtil で簡単に RecyclerView を更新する


Google発リアクティブなライブラリ「Agera」の評判を眺める

google_agera__Reactive_Programming_for_Android

Agera (Swedish for “to act”) is a super lightweight Android library that helps prepare data for consumption by the Android application components (such as Activities), or objects therein (such as Views), that have life-cycles in one form or another. It introduces a flavor of functional reactive programming, facilitates clear separation of the when, where and what factors of a data processing flow, and enables describing such a complex and asynchronous flow with a single expression, in near natural language.

Home · google/agera Wiki

どんなものなのか良く知らないので掲示板で評判を眺めて雰囲気を確認する.

REDDIT: AGERA - Reactive Programming for Android by Google : androiddev

どんな特徴なのか?

I'm a little surprised they don't mention RxJava at all. At first glance this looks like they wanted to create a simplified version of RxJava. It really doesn't seem to be that Android specific, except for the extensions.

(公式ドキュメントで) RxJavaについて全く言及していないのはのには少し驚きました. 一見, RxJavaの簡易版を作りたかったように見えます. また, エクステンションを除けば, Android に特化しているようにはみえないです.

The library is not very Android-specific tho, and it suffers from the same pitfalls loaders and other Google patterns/libs do: no rotation, poor unsubscribe logic.

まったく Androidに特化していません. Loaderや他のGoogleパターンやライブラリのように同じ落とし穴に悩まされる. ローテション不可, 貧弱な unsubscribe ロジック.

なぜつくったのか

I'd love to hear why they went with an in-house reactive library. A nice long blog post would be great.
Performance? Memory overhead? Android-OS specific concerns?

なぜ自社内でリアクティブライブラリを作成したのか聞きたいです. 素敵な長いブログは素晴らしいです. パフォーマンス? メモリーオーバーヘッド? Androiid OSに特化した何か?

Most likely because Google has a strong internal policy to avoid depending on external libraries, so they were probably forced to develop their own. Obviously, that doesn't make for a very good excuse, so they avoid the whole topic completely and don't even mention Rx a single time in their docs.

なぜなら Googleは外部ライブラリに依存しないという強力な内部ポリシーを持っている. それで, 多分開発を強いられたのでしょう. 明らかに, それは良い理由ではありません. それで, すべての話を完全に避けて, ドキュメント内で一回も言及していません.

I'm not a Googler, but I know of no such policy.
I do know (through decompilation) that applications such as Gmail were using Dagger 1.x as far back as 2-3 years ago. In addition, the Android platform uses many external dependencies, including sqlite and okhttp. So I'm curious where you get this information.

私はGoogleの者ではないが, そのようなポリシーはないと思います.
Gmail は2,3年前 Dagger1.x を利用していたことを (デコンパイルにより) 知っています.
加えて, Androidプラットフォームはsqliteやokhttpを含むたくさんの外部依存を用いています. どこでその情報を仕入れたのか興味がありますね.

The open source licenses are listed in their apps if the open source library requires it. I don't see Dagger listed in the Gmail app but I see it in Inbox, along with a whole slew of open source licenses.

もし, オープンソースライブラリを必要としているなら, オープンソースライセンスがそれらアプリに列挙されます. Dagger は Gmail アプリで列挙されているのをみたことないが, Inbox ではみました. たくさんのオープンソースライセンスの中にありました.

I'm guessing they built this before RxJava was available or widely used and are just now releasing it as open source.

私は, RxJavaが利用可能となり広く利用される前にこれは作成されたものと推測しています. そして, 今回オープンソースとして公開されただけだと.

Googleの人登場!

We don't compare Agera with RxJava because it's not meant to be a replacement for it nor it's our opinion that it should be replaced.

Reactive programming has many flavors and Agera proposes one that we know works well for Android. It was developed internally as part of Google Play Movies and we like to open source code whenever possible.

Question: what's the relation to RxJava? · Issue #20 · google/agera

我々は, Agera と RxJava を比較していません. なぜなら, それに代わるものを意味しているものでもなく, 置き換えるものであるという意見でもありません.

リアクティブプログラミングには多くのフレーバーがあり, Agera はAndroid上で問題なく動くもののひとつとして提案しています. Google Play ムービーの一部として内部で開発されたもので, 我々は, 可能な限りソースコードを公開したい.

今後どうなるのか?

Reading between the lines in some of the answers it seems like they wanted something Android specific.

コメントを読んでいると, Androidに特化した何かを欲しがっているね.

My major concern would be that this was written for Google Play Movies as mentioned in GitHub issue linked above. So it doesn't seem like a first class project at Google so the odds of it being actively maintained seem slim.

上の GitHubリンクで言及されているように. 一番の注目点はGooglePlayMoviesのために書かれたということでしょう. それは, Googleの重要なプロジェクトではないように見えるのでメンテナンスのアクティビィティが細っていく可能性がある.

RxJava から乗り換えたほうがいいのか

Unless there's some major advantage I'd say it's better to stick to the RxJava or RxAndroid from Wharton. I don't want to learn another api for something is already so popular.

大きな利点がないならば, Wharton の RxJava や RxAndroid にとどまったほうが良いといいたい. すでに学んだ有名なものに対して別のAPIを学びたくない.

RxJava, definitely. It's widely used, it's got great maintainers, it isn't Java/Andorid specific, the same pattern is used in a lot of languages.

絶対に RxJava がよいです. 広く使われており, すばらしいメンテナーたちがいます. Java-Android に特化されているわけでなく, たくさんの言語で同じパターンが使われています.

As a person still learning about reactive programming, should I stick with learning RxJava/RxAndroid or give AGERA a try?

リアクティブプログラミングを学んでいるものとして, RxJava/RxAndroid を引き続き勉強するべきか,または, AGERAを試してみるべきか?

If you are new, stick to Rx for now. Have a good grasp on it first. Mastering other reactive tech should be approximating and mapping similar logic from one to another. Those are opinionated APIs, but principles are setup. It is still early to make such claims but reactive knowledge should transfer between language/tech as well as polymorphism does today.

もしあなたが新人であれば, 今は Rx にどどまるべきです. まずそれをしっかり理解しなさい. 他のリアクティブ技術は 似たロジックで構成されています. APIは違うけれども原則は同じです. そのような意見ははまだ早すぎで, 今で言うポリフォーイズムと同じようにリアクティブの知識を持ってから言語や技術間を移動すべきです.

Sticking with RX would give you benefits are:
1. Multi language support. Migrating logic should be a breeze.
2. Great support on the maintainers side.
3. Any company that uses it will want to hire you more.
4. Syntax looks similar enough to other tech like streaming APIs of Java 9.

Rx のメリット:
1. 複数の言語サポート. ロジックの移行が簡単.
2. メンテナー側からの素晴らしいサポート.
3. それを使用するすべての企業はより多くのあなたを雇うことになるでしょう。
4. Java9の ストリーミングAPIのような他の技術に文法がかなり似ている.

今のところは

「乗り換えなくてもいいけど, 試してみる」くらいが妥当なところなのかな.


直感的に理解する RxJava その4: Reactive Android

ReactiveX

その1, その2, その3では, RxJava がどのように機能するかを書きました. Androidアプリの開発では, それをどのように利用するのでしょうか. ここでは Android開発者に向けての実践的なことを書いてみようと思います.

RxAndroid

RxAndroid は Android用の RxJavaエクステンションで, より簡単に利用できるようバインディングなどを含んだものです.

まず最初に, AndroidScheduler です. これは, 既存の Android のもつスレッド処理を考慮したスケジューラを提供します. 別に, UIスレッド処理に関係するコードは必要ありません.
AndroidSchedulers.mainThread() を利用するだけです.


retrofitService.getImage(url)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

もし, あなたが自作の Handler を利用しているのであれば HandlerThreadScheduler を利用して スケジューラとそれをリンクしましょう.

次に, AndroidObservable には, Androidライフサイクル内で有効に利用できる機能があります. それは bindActivity() と bindFragment() です. Observable に対してAndroidSchedulers.mainThread() を使うことで自動的に追加でき, Activity や Fragment が終了した場合にはアイテムを発することを停止することができます.


AndroidObservable.bindActivity(this, retrofitService.getImage(url))
    .subscribeOn(Schedulers.io())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

また, AndroidObservable.fromBroadcast() は便利です. これは, BroadcasrReceiver のように利用することができる Observable を作成できます. 以下が, ネットワーク接続状態が変化したときに通知するコードです.


IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
    .subscribe(intent -> handleConnectivityChange(intent));

その他に, View にバインディングできる ViewObserbable が2つあります. あるタイミングで View がクリックされたというイベントを取得したい場合には ViewObserbable.clicks(), TexitView が持っているテキストの変更を監視する ViewObservable.text() があります.


ViewObservable.clicks(mCardNameEditText, false)
    .subscribe(view -> handleClick(view));

Retrofit

Android用RESTクライアントの有名なライブラリである Retrofit は RxJava をサポートしています.

通常ではコールバックを追加して非同期処理を実装します.


@GET("/user/{id}/photo")
void getUserPhoto(@Path("id") int id, Callback<Photo> cb);

RxJava を利用していれば, これの代わりに Observable を返すことができます.


@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);

これだけで Observable に変換することができます. データを取得するだけでなく, 変換も同時に行います.

また, Retrofit は Observable に対して, 簡単に複数のRESTコールを合成することができます. 以下のサンプルでは, 写真データとそのメタデータを取得したいとします. zip を利用して取得結果を合成します.


Observable.zip(
    service.getUserPhoto(id),
    service.getPhotoMetadata(id),
    (photo, metadata) -> createPhotoWithData(photo, metadata))
    .subscribe(photoWithData -> showPhoto(photoWithData));

これに似たサンプルをその2 (flatMap() を利用) で紹介しました. 複数のREST処理は RxJava + Retrofit でこれだけ簡単に利用できるのです.

今までのコードや処理に時間のかかるコード

Retrofit は適切な Observable を返すことができることは分かりましたが, その他のライブラリを利用している場合はどうなるでしょうか. Observable に変換するコードが必要となるのでしょうか? すべてを変更せずにそのコードをどのようにして利用することができるのでしょうか ?

ほとんどの場合, そのコードから Observable を作成するには Observable.just() とObservable.from() で十分であると思います.


private Object oldMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.just(oldMethod());
}

oldMethod() が短時間の処理であればこれで問題ないでしょう. しかし, 時間のかかる処理であればどうなるでしょう? Observable.just() の前に oldMethod() が実行されスレッドをブロックしてしまいます.

この問題の対応策として, 時間のかかる処理を defer() でラップするという技を私はいつも利用しています.


private Object slowBlockingMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.defer(() -> Observable.just(slowBlockingMethod()));
}

このようにすると Observable が subscribe されるまでは slowBlockingMethod() から結果を返さないようになります.

ライフサイクル

最もややこしい部分です. Activity のライフサイクルに対しての操作はどうなるでしょう? 2つの問題がいつも登場します.

1.「Configuration が変更されるとき(端末の回転時など) に Subscription が継続している」

Retrofit を利用してRESTコールを行い, 外部から取得したデータを ListView に表示するとします. もし, ユーザが画面を回転させたらどうなるでしょうか? そのリクエストを継続したい場合はどうすればよいのでしょうか?

2. Observable が Context を参照したままでいるので メモリーリークが発生する.

この問題は Context を保持している Subscription が引き起こしています. View と連携しているときはわかりやすいですが, なぜか保持しています. 最終的には, たくさんのメモリーを食いつぶしてしまいます.

残念ながら, それぞれの問題には一発で解決できる方法はありませんが, 分かりやすく理解するためのいくつかのガイドラインがあります.

最初の問題は, RxJava のもつキャッシュ機能で解決できます. それを使うことで同じ Observable が重複して動かないように unsubscribe/resubscribe できます. cache() や replay() を使うことで, たとえ unsubscribe してる場合でさえ 遅延なしのリクエストを継続することができます. このことは, Activity が再作成後の新しい Subscription によりレジュームできることを意味しています.


Observable<Photo> request = service.getUserPhoto(id).cache();
Subscription sub = request.subscribe(photo -> handleUserPhoto(photo));

// ...When the Activity is being recreated...
sub.unsubscribe();

// ...Once the Activity is recreated...
request.subscribe(photo -> handleUserPhoto(photo));

注意点としては, 両方のタイミングで同じキャッシュされたリクエストを利用していることです. 遅延のないコールは一度しか行われません. どこにリクエストを保存するかはあなた次第ですが, ライフサイクルを考慮しながら, ライフサイクルの影響を受けないどこかに保存しなければなりません.

二つ目の問題は, ライフサイクルを考慮しながらの Subscription に対しての適切な unsubscribe で解決できます. これの一般的な方法は CompositeSubscription を使うことですべての Subscription を保持し, onDestroy() や onDestroyView() のタイミングに一括で unsubscribe する方法です.


private CompositeSubscription mCompositeSubscription
    = new CompositeSubscription();

private void doSomething() {
    mCompositeSubscription.add(
        AndroidObservable.bindActivity(this, Observable.just("Hello, World!"))
        .subscribe(s -> System.out.println(s)));
}

@Override
protected void onDestroy() {
    super.onDestroy();

    mCompositeSubscription.unsubscribe();
}

このように Activity/Fragment で CompositeSubscription を持つことで, それぞれを追加し, その後, 一括ですべてを unsubscribe することができます.

注意することは, 一度 CompositeSubscription.unsubscribe() するとそれは再利用できません. 再度利用する場合は 新しい CompositeSubscription を作成して利用しなければなりません.

という形でどちらの問題もコード追加で解決できます. これらのボイラープレートなしに問題を解決できる天才がいつか現れて欲しいです.

あとがき

Android についてはこれだけではありません. RxJava はこれからも更新され, Android にもさらに対応されていくでしょう. みなさんがこれらのことをさらに理解しようとするとき, RxAndroid はまだ開発中で, よいサンプルがまだありません. 私がここで提示したサンプルはこれから一年間は興味深いものになると思います.

その間, RxJava がコーディングを簡単にするだけのものだけでなく, もっとおもしろいものを探します. もしあなたがまだ納得していなければ, いつか私を見つけてください, それについてビールを飲みながら話します.

[原文] Grokking RxJava, Part 4: Reactive Android

直感的に理解する RxJava その1: 基本的な構成

直感的に理解する RxJava その2: Operator

直感的に理解する RxJava その3: リアクティブであることのメリット