直感的に理解する RxJava その2: Operator

ReactiveX

その1では, RxJava の基本的な構造と map() オペレータの紹介をしました. しかし, これだけではまだまだ実際には利用できません.

しかし, RxJava の実力の大部分はオペレータなので, すぐに利用できるようになることができます.

サンプルをみながらさらに Operator を学んでいきまよう.

設定

利用可能なこのメソッドがあるとします.


// テキスト検索からWEBサイトのURLの List を返す
Observable<List<String>> query(String text); 

テキストで検索してこれらの結果を表示したいとします. 前回の記事をふまえて考えると以下のようなものが思いつくかもしれません.


query("Hello, world!")
    .subscribe(urls -> {
        for (String url : urls) {
            System.out.println(url);
        }
    });

これは少し残念です. なぜなら, データストリームの変換を利用していません. もし, それぞれのURLを変形したい場合はすべてを Subscriber の中でやらなければなりません. 便利な map() を利用していません.

urls -> urls 内で map() を作成することもできますが, 残念ながら, その後のすべての map() の呼び出しのたびに for-each ループを持つことになります.

この方法ではどうか

Observable.from() はアイテムのコレクションを取得し, それぞれを一度に発することができます.


Observable.from("url1", "url2", "url3")
    .subscribe(url -> System.out.println(url));

こんな感じで使えます.


query("Hello, world!")
    .subscribe(urls -> {
        Observable.from(urls)
            .subscribe(url -> System.out.println(url));
    });

for-each ループをなくすことはできますが, 結果, コードはややこしくなり入れ子した Subscription になってしまいます. しかも変更しづらく, 実は RxJava の重要な機能を壊しています.

良い方法

ここで flatMap() の登場です.

Observable.flatMap() は Observable から 別の Observable を作成します. アイテムからストリームを作成し別の Observable とすることができます.


query("Hello, world!")
    .flatMap(new Func1<List<String>, Observable<String>>() {
        @Override
        public Observable<String> call(List<String> urls) {
            return Observable.from(urls);
        }
    })
    .subscribe(url -> System.out.println(url));

何が起こっているかわかりやすく書いていますが, ラムダ式で簡単にできますね.


query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .subscribe(url -> System.out.println(url));

ここで大事なことは「返された新しい Observable は Subscriber が利用するもの」ということです. それは, 連続した Observable.from() から返された独立した String たちです.

さらにより良く

flatMap() で欲しい Observable を返すことができましたがまだ十分ではありません.

2つ目の次のメソッドを可能にしなければならないと仮定します.


// サイトのタイトルを返す(もし404なら null を返す)
Observable<String> getTitle(String URL);

URLを表示する代わりに, そのページのタイトルを表示したい場合には問題が有りそうです. メソッドはそれぞれのURLに対してのみ動作して, String ではなくObservable を返します.

しかし, それも flatMap() で簡単に解決できます. Subscriber に到達する前にURLリストが個別のアイテムに分割された後は flatMap() の中でそれぞれの URLに対して getTitle() を使うことができます. .


query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String url) {
            return getTitle(url);
        }
    })
    .subscribe(title -> System.out.println(title));

そしてさらにラムダ式でシンプルにします.


query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .subscribe(title -> System.out.println(title));

複数のメソッドと Observable の返却を同時に組み合わせて行うことができます. すばらしいですね.

それだけでなく, どのように2つのAPIコールを組み合わせてひとつのチェインにいれたか, に注意してみてください. いくつでもAPIコールをいれることができます. 知ってますよね. APIコールの同期を保持して, コールバックを連携させて, データを取得することがどれだけ面倒か. これはすでに, コールバック地獄をスキップしています. そして同じロジックの中にまとめて短いリアクティブなコールになっています.

Oprerator の豊富さ

ここまでで2つの Operator しか使っていませんが, もっとたくさんあります. 他のものを使うとどれだけコードを改善できるでしょう?

URLが 404 のとき getTitle() は null を返します. "null" は表示させたくありませんね. 変更してそれらをフィルタしましょう.


query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .subscribe(title -> System.out.println(title));

fileter() は boolean のチェックを通過したもののみ発します.

そして, 今度は, 5個までの結果だけ表示しましょう.


query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .subscribe(title -> System.out.println(title));

take() は, 設定された数字を最大の個数として発します.

今度は, それぞれのタイトルを保存しましょう.


query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .doOnNext(title -> saveTitle(title))
    .subscribe(title -> System.out.println(title));

doOnNext() は, 追加したいふるまいをそれぞれのアイテムが発せられると同時に行います. この場合はタイトルを保存しています.

ここまで, ストリームデータをどれだけ簡単に操作できるかみてきました. 材料をあなたのレシピの合わせて混乱することなく何度も処理できますね.

RxJava には, 大量の Operator がありますが 何ができるか参照してみる価値はあると思います.
習得するのに時間がかかるかもしれませんが, 習得するとすぐに使えて役に立つと思います.

カスタムオペレータでさえ書くことができます. この記事では書きませんが, 基本あなたがやりたいと思えばできることでしょう.

それがどうしたの?

なぜ これらの operator を使うのか?

3: 「Operator は データストリームに対してどんな操作でもできる.」

限度となるのはあなた自身です.

シンプルな Operator のチェインで複雑なロジックを設定することができ, 構成したままの状態でコードを小分けに分割します。

それに加えて, 利用するために変換されたデータはどれだけシンプルになっているか考えてみてください. 最後のサンプルのコードでは, 2つのAPIをコールして, データを操作し, それをディスクに保存しました. しかし, Subscriber は それが利用するシンプルな Observable しか知りません. 「カプセル化」によりコーディングをより簡単にすることができます.

次回, その3ではRxJavaの他の素晴らしい機能であるエラーハンドリングと並行処理についてです.

[原文] Grokking RxJava, Part 2: Operator, Operator

直感的に理解する RxJava その1: 基本的な構成

直感的に理解する RxJava その3: リアクティブであることのメリット

直感的に理解する RxJava その4: Reactive Android