直感的に理解する RxJava その4: Reactive Android

ReactiveX

その1, その2, その3では, RxJava がどのように機能するかを書きました. Androidアプリの開発では, それをどのように利用するのでしょうか. ここでは Android開発者に向けての実践的なことを書いてみようと思います.

RxAndroid

RxAndroid は Android用の RxJavaエクステンションで, より簡単に利用できるようバインディングなどを含んだものです.

まず最初に, AndroidScheduler です. これは, 既存の Android のもつスレッド処理を考慮したスケジューラを提供します. 別に, UIスレッド処理に関係するコードは必要ありません.
AndroidSchedulers.mainThread() を利用するだけです.


retrofitService.getImage(url)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

もし, あなたが自作の Handler を利用しているのであれば HandlerThreadScheduler を利用して スケジューラとそれをリンクしましょう.

次に, AndroidObservable には, Androidライフサイクル内で有効に利用できる機能があります. それは bindActivity() と bindFragment() です. Observable に対してAndroidSchedulers.mainThread() を使うことで自動的に追加でき, Activity や Fragment が終了した場合にはアイテムを発することを停止することができます.


AndroidObservable.bindActivity(this, retrofitService.getImage(url))
    .subscribeOn(Schedulers.io())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

また, AndroidObservable.fromBroadcast() は便利です. これは, BroadcasrReceiver のように利用することができる Observable を作成できます. 以下が, ネットワーク接続状態が変化したときに通知するコードです.


IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
    .subscribe(intent -> handleConnectivityChange(intent));

その他に, View にバインディングできる ViewObserbable が2つあります. あるタイミングで View がクリックされたというイベントを取得したい場合には ViewObserbable.clicks(), TexitView が持っているテキストの変更を監視する ViewObservable.text() があります.


ViewObservable.clicks(mCardNameEditText, false)
    .subscribe(view -> handleClick(view));

Retrofit

Android用RESTクライアントの有名なライブラリである Retrofit は RxJava をサポートしています.

通常ではコールバックを追加して非同期処理を実装します.


@GET("/user/{id}/photo")
void getUserPhoto(@Path("id") int id, Callback<Photo> cb);

RxJava を利用していれば, これの代わりに Observable を返すことができます.


@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);

これだけで Observable に変換することができます. データを取得するだけでなく, 変換も同時に行います.

また, Retrofit は Observable に対して, 簡単に複数のRESTコールを合成することができます. 以下のサンプルでは, 写真データとそのメタデータを取得したいとします. zip を利用して取得結果を合成します.


Observable.zip(
    service.getUserPhoto(id),
    service.getPhotoMetadata(id),
    (photo, metadata) -> createPhotoWithData(photo, metadata))
    .subscribe(photoWithData -> showPhoto(photoWithData));

これに似たサンプルをその2 (flatMap() を利用) で紹介しました. 複数のREST処理は RxJava + Retrofit でこれだけ簡単に利用できるのです.

今までのコードや処理に時間のかかるコード

Retrofit は適切な Observable を返すことができることは分かりましたが, その他のライブラリを利用している場合はどうなるでしょうか. Observable に変換するコードが必要となるのでしょうか? すべてを変更せずにそのコードをどのようにして利用することができるのでしょうか ?

ほとんどの場合, そのコードから Observable を作成するには Observable.just() とObservable.from() で十分であると思います.


private Object oldMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.just(oldMethod());
}

oldMethod() が短時間の処理であればこれで問題ないでしょう. しかし, 時間のかかる処理であればどうなるでしょう? Observable.just() の前に oldMethod() が実行されスレッドをブロックしてしまいます.

この問題の対応策として, 時間のかかる処理を defer() でラップするという技を私はいつも利用しています.


private Object slowBlockingMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.defer(() -> Observable.just(slowBlockingMethod()));
}

このようにすると Observable が subscribe されるまでは slowBlockingMethod() から結果を返さないようになります.

ライフサイクル

最もややこしい部分です. Activity のライフサイクルに対しての操作はどうなるでしょう? 2つの問題がいつも登場します.

1.「Configuration が変更されるとき(端末の回転時など) に Subscription が継続している」

Retrofit を利用してRESTコールを行い, 外部から取得したデータを ListView に表示するとします. もし, ユーザが画面を回転させたらどうなるでしょうか? そのリクエストを継続したい場合はどうすればよいのでしょうか?

2. Observable が Context を参照したままでいるので メモリーリークが発生する.

この問題は Context を保持している Subscription が引き起こしています. View と連携しているときはわかりやすいですが, なぜか保持しています. 最終的には, たくさんのメモリーを食いつぶしてしまいます.

残念ながら, それぞれの問題には一発で解決できる方法はありませんが, 分かりやすく理解するためのいくつかのガイドラインがあります.

最初の問題は, RxJava のもつキャッシュ機能で解決できます. それを使うことで同じ Observable が重複して動かないように unsubscribe/resubscribe できます. cache() や replay() を使うことで, たとえ unsubscribe してる場合でさえ 遅延なしのリクエストを継続することができます. このことは, Activity が再作成後の新しい Subscription によりレジュームできることを意味しています.


Observable<Photo> request = service.getUserPhoto(id).cache();
Subscription sub = request.subscribe(photo -> handleUserPhoto(photo));

// ...When the Activity is being recreated...
sub.unsubscribe();

// ...Once the Activity is recreated...
request.subscribe(photo -> handleUserPhoto(photo));

注意点としては, 両方のタイミングで同じキャッシュされたリクエストを利用していることです. 遅延のないコールは一度しか行われません. どこにリクエストを保存するかはあなた次第ですが, ライフサイクルを考慮しながら, ライフサイクルの影響を受けないどこかに保存しなければなりません.

二つ目の問題は, ライフサイクルを考慮しながらの Subscription に対しての適切な unsubscribe で解決できます. これの一般的な方法は CompositeSubscription を使うことですべての Subscription を保持し, onDestroy() や onDestroyView() のタイミングに一括で unsubscribe する方法です.


private CompositeSubscription mCompositeSubscription
    = new CompositeSubscription();

private void doSomething() {
    mCompositeSubscription.add(
        AndroidObservable.bindActivity(this, Observable.just("Hello, World!"))
        .subscribe(s -> System.out.println(s)));
}

@Override
protected void onDestroy() {
    super.onDestroy();

    mCompositeSubscription.unsubscribe();
}

このように Activity/Fragment で CompositeSubscription を持つことで, それぞれを追加し, その後, 一括ですべてを unsubscribe することができます.

注意することは, 一度 CompositeSubscription.unsubscribe() するとそれは再利用できません. 再度利用する場合は 新しい CompositeSubscription を作成して利用しなければなりません.

という形でどちらの問題もコード追加で解決できます. これらのボイラープレートなしに問題を解決できる天才がいつか現れて欲しいです.

あとがき

Android についてはこれだけではありません. RxJava はこれからも更新され, Android にもさらに対応されていくでしょう. みなさんがこれらのことをさらに理解しようとするとき, RxAndroid はまだ開発中で, よいサンプルがまだありません. 私がここで提示したサンプルはこれから一年間は興味深いものになると思います.

その間, RxJava がコーディングを簡単にするだけのものだけでなく, もっとおもしろいものを探します. もしあなたがまだ納得していなければ, いつか私を見つけてください, それについてビールを飲みながら話します.

[原文] Grokking RxJava, Part 4: Reactive Android

直感的に理解する RxJava その1: 基本的な構成

直感的に理解する RxJava その2: Operator

直感的に理解する RxJava その3: リアクティブであることのメリット


直感的に理解する RxJava その3: リアクティブであることのメリット

ReactiveX

RxJava について, その1では「基本的な構造」, その2では「強力な Operator」についてみてきました. しかし, あなたは RxJava のメリットについて納得していないでしょう. ここでは, RxJava フレームワークがもつ他のいくつかのメリットを明らかにしていきます.

エラーハンドリング

ここまで onComplete() と onError() には触れませんでしたが, これらは, Observable がアイテムを発するのを停止したときに, 成功完了なのか, エラー停止したのかを示しています.

Subscriber は onComplete() と onError() をリッスンする機能をもっています. 実際にやってみましょう.


Observable.just("Hello, world!")
    .map(s -> potentialException(s))
    .map(s -> anotherPotentialException(s))
    .subscribe(new Subscriber<String>() {
        @Override
        public void onNext(String s) { System.out.println(s); }

        @Override
        public void onCompleted() { System.out.println("Completed!"); }

        @Override
        public void onError(Throwable e) { System.out.println("Ouch!"); }
    });

potentialException() と anotherPotentialException() の両方が, Exceptionを throw する可能性があるとしましょう.

すべての Observable は一回のコールにつき onComplete() か onError() で終了します. そのとき, プログラム出力は "Completed!" かまたは, Exception が throw されたときは "Ouch!" を 最後に出力します.

これらから分かることがあります.

1.「Exception が throw されたときはいつでも onError() がコールされる」

このことは, エラーハンドリングを極めてシンプルに可能にすることを意味しています. 結果, これだけですべてのエラーをハンドルできます.

2.「Operator は Exception をハンドルする必要がない」

Exception は onError() までスキップするので Observable チェインのどこかで起きた問題をどのように処理するかを Subscriber までは気にしなくて良いということになります.

3.「Subscriber がアイテムの受取りを完了したかどうか把握できる」

タスクが終了するタイミングを知ることはコード上で必要になることがあります. ただ, Observable が終了しない可能性もあります.

このことは, これまでのエラーハンドリングに比べてかなり分かりやすいでしょう. コールバックを使う場合は, それぞれのコールバックでエラーをハンドルしなければならず, 同じコードの繰り返すことになりますし, それぞれのコールバックは, どのようにエラーをハンドルするか, を知らなければなりません. このことは, コールバック部分のコードは密に呼び出し側と連携されているということになります.

RxJava では, Observable は エラー時の処理を知る必要が無く, エラーの状態をハンドルする必要もありません. エラーの場合は処理をスキップして, すべてのエラーハンドリングを Subscriber に置くことができます.

Scheduler

Androidアプリでネットワークリクエストを利用するとします. それが時間がかかるものであれば, 別のスレッドでロードします. そこで問題がおきたりすることが有ります.

マルチスレッドのAndroidアプリは難しく, 正しいスレッドで正しいコードを実行することを確実に行わなければなりません. これは混乱してアプリがクラッシュする可能性がとなります. メインスレッド以外からViewを操作するとException が発生することはご存知だと思います.

RxJava では, subscribeOn() を使うことで どのスレッド上で実行するかを Observer に教えることができます. また observeOn() を使うことで Subscriber を実行するスレッドを指定できます.


myObservableServices.retrieveImage(url)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

シンプルですね. Subscriber までのすべての処理は I/Oスレッドで実行されます. そして最後にメインスレッドでViewの操作が行われます.

メインスレッドのブロックをできるだけ少なくするために, Subscriber はできるだけ軽くする.

これらは非常に便利で, すべての Observable に subscribeOn() と ObserveOn() を追加することができます. ただの Operator です. 何の Observable であるか, 前の Operator が何をしているかについては気にする必要はありません. 簡単にスレッドの振り分けを指示することができます.

Observable の処理に時間がかかる場合に, Subscriber がすでにI/Oスレッドで準備できていれば, それを監視する必要がないので subscribeOn() から observeOn() を先延ばしにすることは良い実験になります.

AsyncTask などでは, どの部分を並列処理にするかある程度設計しなければなりませんでしたが, RxJavaを使うと そのまま並列処理と追加するだけでよいのです.

Subscription

他にもまだ説明していないメリットがあります. Observable.subscribe() をコールした時これは Subscription を返します. これは, Observable と Subscriber のリンクです.


Subscription subscription = Observable.just("Hello, World!")
    .subscribe(s -> System.out.println(s));

このリンクである Subscription を以下のように利用することができます.


subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
// "Unsubscribed=true" と出力される

RxJava が unsubscribe を操作することの良いところは, チェインをストップすることができることです. 複雑な Operator のチェインを利用している場合, unsubscribe を使うことでいつでも実行中のコードを停止することができます. 他に処理は必要ありません.

その1で Observable.just() は, onNext() と onComplete() を単純にコールすることに比べて, すこし複雑であることを書きました. その理由は Subscription にあります. それは onNext() をコールする前に, Subscriber がすでに subscribe されているかどうかを実際にチェックするからです.

まとめ

気に止めておいて欲しいのは, これらの話は RxJava の序論です. ここで説明したことよりもっと多くの学ぶべきことがあると思います. たとえば, Backpressure について調べてみてください.

すべてに これらのリアクティブなコードは使う必要はありません. シンプルなロジックにしたい複雑なコードに対して利用しましょう.

これらの説明で このおもしろいフレームワークを始めるには十分だと思います. もっと学びたければ, 公式 RxJava Wiki を読むことをおすすめします. そして「可能性は無限」であることを忘れないでください.

元々は, この記事で最終にする予定でしたが Android 向けの実用的なサンプルのリクエストが多く, その4 を書いています.

[原文] Grokking RxJava, Part 3: Reactive with Benefits

直感的に理解する RxJava その1: 基本的な構成

直感的に理解する RxJava その2: Operator

直感的に理解する RxJava その4: Reactive Android


直感的に理解する RxJava その1: 基本的な構成

ReactiveX

最近 Android 開発者の間では RxJava が話題です. ただ最初はとっつきにくいのが難点です. 関数型リアクティブプログラミングは命令型プログラミングの感覚から近づくと難しいかもしれませんが, いったん理解すると素晴らしいです.

ここでは RxJava の特色を説明します. 4回分の記事はあなたを入り口に案内するためのもので, すべてを説明はできませんが RxJava やそれがどのように動くかに興味をもっていただければと思います.

基本的な使い方

リアクティブコードは Observable と Subscriber で構成されています. Observable はアイテムを発し, Subscriber はそれらのアイテムを受取ります.

アイテムの発し方には, パターンがあります, Observable は, 0から複数個のアイテムを発します. そして, すべてが成功して完了するか, エラーになると終了します. Observable が Subscriber.onNext() を繰り返し, Subscriber.onComplete() か Subscriber.onError() で終了します.

これは, 標準的な Observer パターンによく似ていますが, ひとつ大きな違いがあります. Observable は, subscribe されないかぎりアイテムを発しません. 言い換えれば, だれかにリッスンされていなければ, アイテムは発せられません.

Hello, world!

サンプルでどのように動くかみてみましょう. 最初は, 基本的な Observable をつくります.


Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, world!");
            sub.onCompleted();
        }
    }
);

この Observable は "Hello, world!" を発して終了します.

このデータを受けとる Subscriber を作ります.


Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }

    @Override
    public void onCompleted() { }

    @Override
    public void onError(Throwable e) { }
};

これがやっていることは, Observable によって発された文字列を表示するだけです.

Observable と Subscriber ができたので, それらを subscribe() を使ってフックします.


myObservable.subscribe(mySubscriber);

subscribe されたとき Observable は Subscriber の onNext() と onComplete() メソッドをコールします. その結果, Subscriber は "Hello, world!" を出力して終了します.

コードの簡略化

"Hello, world!" と出力させるだけでたくさんのボイラープレートコードがあります. これは何が起きているかすべてみることができるように冗長にしています. RxJavaではコードをシンプルにするためにたくさんのショートカットがあります.

最初に Observable を簡単にしてみましょう. RxJava は一般的な処理に対しての Observable の作成メソッドをいくつか持っています. この場合では Observable.just() を使って上のコードとまったく同じように ひとつのアイテムを発して完了するように書き換えることができます.


Observable<String> myObservable =
    Observable.just("Hello, world!");

次は, Subscriber です. まずは, onCompleted() や onError() を無視して, 代わりに onNext() のふるまいを定義するクラスを使います.


Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
};

Action は Subscriber の一部を定義することができます. Observable.subscribe() は, 3つまでの Actionパラメータ onNext(), onError(), and onComplete() を渡すことができ, Subscriber は以下のように書き換えることができます.


myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);

onError() と OnComplete() は無視するので必要なのは, 一つ目のパラメータだけです.


myObservable.subscribe(onNextAction);

ここで, メソッドチェインを使ってこれらを書き換えてみます.


Observable.just("Hello, world!")
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
              System.out.println(s);
        }
    });

最後に Java8 ラムダ式を使って Action1 のコードの冗長部分を取り除きます.


Observable.just("Hello, world!")
    .subscribe(s -> System.out.println(s));

Android であれば retrolambda の利用を強くお勧めします. 冗長なあなたのコードをおどろくほど短くしてくれます.

変換処理

少し書き換えてみましょう.

署名を "Hello, world!" に加える場合を考えてみます. まず考えられるのは Observable を変更することです.


Observable.just("Hello, world! -Dan")
    .subscribe(s -> System.out.println(s));

Observable をコントロールできる場合であればいいですが, 他人が作ったライブラリを使うことも考えられます. また 複数の場所で Observable を利用する場合は何回も書き加えることになります.

代わりに Subscriber を変更したらどうでしょう.


Observable.just("Hello, world!")
    .subscribe(s -> System.out.println(s + " -Dan"));

これはまだ満足できるものではありません. Subscriber はメインスレッドで実行されるかもしれないのでできるだけ軽量にしておくべきです. また, 概念的なレベルで見ると Subscriber は「反応するもの」で「変換するもの」ではありません.

これらの中間で "Hello, world!" を変換できれば良いのではないか?

Operator の紹介

ここでは, Operator を使ってどのように変換処理を行うかを紹介します. Operator は, 元の Observable から 最終の Subscriber までの間に, 発されたアイテムを操作します. RxJava にはかなり多くの Operator がありますが, 最初は少しだけ使ってみましょう.

この場合は, 発せられたアイテムを map() で変換することができます:


Observable.just("Hello, world!")
    .map(new Func1<String, String>() {
        @Override
        public String call(String s) {
            return s + " -Dan";
        }
    })
    .subscribe(s -> System.out.println(s));

再度, ラムダ式を使って簡単にします.


Observable.just("Hello, world!")
    .map(s -> s + " -Dan")
    .subscribe(s -> System.out.println(s));

かなりクールではないですか?

map() は基本的なアイテム変換の Operator です. 最終の Subscriber が使える形にまとめたり, データを完全に適合させるために, 多くの map() をチェインすることができます.

さらに map() を

map() には面白い特徴があります. 元 Observable と同じタイプのアイテムを発する必要はありません.

Subscriber がオリジナルのテキストを出力する代わりに テキストのハッシュを出力したいならば以下のようになります.


Observable.just("Hello, world!")
    .map(new Func1<String, Integer>() {
        @Override
        public Integer call(String s) {
            return s.hashCode();
        }
    })
    .subscribe(i -> System.out.println(Integer.toString(i)));

おもしろいですね.
String から始まったのに Subscriber は Integer を受け取ります. このコードをラムダ式で短くしましょう.


Observable.just("Hello, world!")
    .map(s -> s.hashCode())
    .subscribe(i -> System.out.println(Integer.toString(i)));

最初に言ったように, Subscriber はできるだけ短くしたいので, もうひとつ map() を使って String に戻しましょう.


Observable.just("Hello, world!")
    .map(s -> s.hashCode())
    .map(i -> Integer.toString(i))
    .subscribe(s -> System.out.println(s));

元のコードの件に戻ってみましょう. いくつかの変換のステップを追加するだけです。同様に署名変換を追加することができます。


Observable.just("Hello, world!")
    .map(s -> s + " -Dan")
    .map(s -> s.hashCode())
    .map(i -> Integer.toString(i))
    .subscribe(s -> System.out.println(s));

だから何?

ここであなたは思ったかもしれません.

「簡単なコードなのにいろいろやりすぎ」

確かにそうです.

これらは簡単なコードの例でしたが, 大事な2つの考え方が含まれています.

1:「Observable と Subscriber でいろんなことができる」

想像力を働かせてみましょう. いろんなことができます.

Observable は データベースクエリーとなることができます. Subscriber は 結果を受け取り画面に表示します. また, Observable は 画面のクリックにもなることもでき, Subscriber ではそれに反応できます. あるいは, Observable はインターネットから取得したバイトストリームになり, Subscriber ではそれをディスクに書き込むことができます.

どんな処理もできる一般的なフレームワークなのです.

2:「Observable と Subscriber はそれらの間にある変換処理から独立している」

最初の Observable と最後の Subscriber の間の map() 処理に集中して取り組む事ができます.
高度に組み合わせが可能で, データ操作しやすく, 入出力データが正しく Operator で操作できるようにいくらでもメソッドをチェインすることができます.

これら 2つのキーとなる考え方を組み合わせることでたくさんの潜在能力を引き出すことができます. ここでは, map() ひとつだけを利用しましたがこれだけでは限界があります. その2では RxJava で利用できる 多くの Operator を利用してみます.

[原文] Grokking RxJava, Part 1: The Basics

直感的に理解する RxJava その2: Operator

直感的に理解する RxJava その3: リアクティブであることのメリット

直感的に理解する RxJava その4: Reactive Android