昔々、Kotlin に導入されたコルーチンは軽量でした。多数のコルーチンを起動しながら、恐怖の「変更可能な状態の共有」問題を避けながらコルーチン間で通信する方法が必要でした。
そこで、コルーチン間通信プリミティブとして Channel が追加されました。Channel は素晴らしく、コルーチン間の、1対1、1対多、多対1、多対多の通信をサポートし Channel に送信された値はすべて1回だけ受信されます。
Channel を使用して、イベントや状態の更新を複数のサブスクライバーが独立しながら受信・反応できるように配信することはできません。
こうして BroadcastChannel が導入され、その実装として buffered と ConflatedBroadcastChannel が導入されました。これらは、しばらくの間は役に立っていましたが、設計的には行き詰っていることがわかりました。現在、kotlinx-coroutinesバージョン1.4から、より良い解決策である SharedFlow を導入しています。詳しくはこの記事をご覧ください。
■ Flow はシンプル
最初は Channel しかなかったので、非同期シーケンスのさまざまな変換をある Channel を引数にとり、結果として別の Channel を返す関数として実装していました。これは、フィルター演算子が独自のコルーチンで実行されることを意味します。
このような演算子の性能は、if 文を書いた場合と比較して、決して良いとは言えませんでした。後から考えてみると、これは驚くべきことではありません。なぜなら Channel は同期プリミティブだからです。どんな Channel でも、たとえプロデューサーとコンシューマーが1つずつの場合に最適化された実装であっても、同時に通信するコルーチンをサポートする必要があり、それらの間のデータ転送には同期が必要です。非同期データストリームの上にアプリケーションアーキテクチャを構築し始めると、当然、変換の必要性が出てきて Channel のコストが発生し始めます。
Kotlin Flow のシンプルな設計により、変換演算子の効率的な実装が可能です。基本的なケースでは、値の emit から collect までが同じコルーチン内で行われ、同期の必要はありません。
同期は、異なるコルーチンでの値の emit と collect が必要な場合にのみ Flow を使って導入します。
■ Flow はコールド
しかし Flow は一般的にコールドです flow { ... } ビルダー関数によって作成された Flow
val coldFlow = flow {
while (isActive) {
emit(nextEvent)
}
}
Flow 自体は、いかなる種類の計算にも裏付けられておらず、collect が始まるまで、それ自体ではいかなる状態も持ちません。すべての collect コルーチンは、emit コードの独自のインスタンスを実行します。
しかし、ユーザーのアクション、外部デバイスのイベント、状態の更新などはどのように処理するのでしょうか?これらは、それらに関心を持つコードがあるかどうかとは無関係に動作します。これらは、アプリケーション内の複数のオブザーバーをサポートする必要があります。これらはいわゆるイベントのホットソースです。
■ SharedFlow
そこで登場するのが、SharedFlow という概念です。SharedFlow は、collect されているかどうかに関わらず存在できます。SharedFlow の collector はサブスクライバーと呼ばれます。SharedFlow のすべてのサブスクライバーは、同じシーケンスの値を受け取ります。これは Channel のオーバーヘッドのほとんどを伴わない BroadcastChannel のように効果的に機能します。これにより、BroadcastChannel の概念が廃止されます。
本質的に SharedFlow は、軽量のブロードキャストイベントバスで、アプリケーションの中で作成して使用することができます。
class BroadcastEventBus {
private val _events = MutableSharedFlow<Event>()
val events = _events.asSharedFlow() // read-only public view
suspend fun postEvent(event: Event) {
_events.emit(event) // suspends until subscribers receive it
}
}
新しいサブスクライバーのために保持して再生する古いイベントの数 replay や、速いエミッタと遅いサブスクライバーにクッションを提供するための extraBufferCapacity など調整パラメータを持っています。
SharedFlow のすべてのサブスクライバーは、それぞれのコンテキストでイベントを非同期的に collect しています。エミッターは、サブスクライバーがイベントの処理を終えるまで待ちません。しかし、SharedFlow のバッファがいっぱいになると、エミッタはバッファに余裕ができるまでサスペンドします。バッファオーバフロー時のエミッタの停止は、コレクターが追いつかないときに emit を遅らせるためのバックプレッシャーとなります。バッファオーバフローに対処する別の方法は、BufferOverlow パラメータでサポートされています。
■ StateFlow
バッファオーバフローに対処するための一般的な方法は、最も古いイベントを削除し、最も新しい最新のイベントのみを保持することです。特に、アプリケーションの状態変数をモデル化するには最適な方法です。これは、廃止されたConflatedBroadcastChannel の代わりとなり、専用の StateFlow で広く使われているケースです。
class StateModel {
private val _state = MutableStateFlow(initial)
val state = _state.asStateFlow() // read-only public view
fun update(newValue: Value) {
_state.value = newValue // NOT suspending
}
}
val x: StateFlow
StateFlow では、複雑な Channel とシンプルな Flow の性能差がはっきりと現れます。StateFlow の実装では、アロケーションフリーの更新が行われますが、これは混信した BroadcastChannel ではそうではありませんでした。
■ Channel
さまざまな種類の SharedFlow がさまざまな種類の BroadcastChannel に取って代わるにつれ、人気のある質問は、普通の Channel はどうなるのかということです。多くの理由から、Channel は存続するでしょう。1つの理由は、Channel が多くの複雑Flow 演算子を実装するために使用される低レベルのプリミティブであることです。
しかし、Channel には応用的な使用例もあります。Channel は、正確に一度だけ処理しなければならないイベントを処理するために使用されます*(詳細は下記の注釈を参照)。これは、通常は1人の加入者がいるが、断続的に(起動時やある種の再構成時に)加入者が全くいないイベントタイプのデザインで、加入者が現れるまで投稿されたイベントをすべて保持しなければならないという要件がある場合に起こります。
class SingleShotEventBus {
private val _events = Channel<Event>()
val events = _events.receiveAsFlow() // expose as flow
suspend fun postEvent(event: Event) {
_events.send(event) // suspends on buffer overflow
}
}
最初の例の SharedFlow で書かれた BroadcastEventBus も、Channel で書かれたこの SingleShotEventBus も、イベントを Flow
SharedFlow では、イベントは未知数(ゼロ以上)のサブスクライバーにブロードキャストされます。サブスクライバーがいない場合、ポストされたイベントは直ちにドロップされます。これは、すぐに処理しなければならないイベントや、まったく処理しないイベントに使用するデザインパターンです。
Channel では、各イベントが1人の購読者に配信されます。購読者のいないイベントを投稿しようとすると、Channel のバッファがいっぱいになるとすぐに中断され、購読者が現れるのを待ちます。投稿されたイベントはドロップされません。
Channel を持つ SingleShotEventBus の実装では、キャンセルがない場合に限り、ポストされた各イベントを正確に1回だけ処理することに注意してください。Flow のサブスクライバーがキャンセルされると、イベントの配信に失敗することがあります。詳細については「Channel の未配信要素」を参照してください。
■ 結論
SharedFlow と Channel の違いを理解し、適切に使い分けましょう。どちらも便利で一緒に使うことを前提に設計されています。
しかし、BroadcastChannel は過去の遺物であり、将来的には非推奨となり削除されるでしょう。
👉 StateFlow の View への公開
👉 【MVVM】 Kotlin Flow で使える5つの利用パターン
👉 StateFlow は distinctUtilChanged 不要