もうすぐ8系捨てれるわ。
StateFlow の View への公開
以下のような ViewModel があったとして、バッキングプロパティ部分。
どう書いてますか。
class CounterModel {
private val _counter = MutableStateFlow(0)
??? counter ??? = _counter ???
fun inc() {
_counter.update { count -> count + 1 }
}
}
以下の登場時の開発の様子を参考に。
👉 Introduce StateFlow · Issue #1973 · Kotlin/kotlinx.coroutines
StateFlow は、状態を表す更新可能な値の Flow です。
- StateFlow
インターフェイスは、現在の値にアクセスするための読み取り専用で、値の更新を collect するための Flow を実装しています。
- MutabaleStateFlow
インターフェースは、値を変更する操作を追加しています。
- MutableStateFlow(x) のコンストラクタ関数が用意されています。この関数は、与えられた初期値を持つ MutableStateFlow の実装を返します。値への高速で非リアクティブなアクセスが必要な場合は StateFlow
として、値への更新のリアクティブな表示のみが必要な場合は Flow として、外部に公開することができます。
次のようにまとめることができます。
package kotlinx.coroutines.flow
interface StateFlow<T> : Flow<T> {
val value: T // always availabe, reading it never fails
}
interface MutableStateFlow<T> : StateFlow<T> {
override var value: T // can read & write value
}
fun <T> MutableStateFlow(value: T): MutableStateFlow<T> // constructor fun
よって、以下、ありがちな記述。(ないか。)
var counter = _counter // NG
val counter = _counter // NG
val counter: MutableStateFlow<Int> = _counter // NG
val counter: StateFlow<Int> = _counter // NG
val counter get() = _counter // NG
val counter: MutableStateFlow<Int> get() = _counter // NG
開発者の間でも、好き嫌いはあるようですが、以下の2パターンが良さげ。Read Only であることが大事。
val counter: StateFlow<Int> get() = _counter // OK
val counter = _counter.asStateFlow() // OK
よって、
class CounterModel {
private val _counter = MutableStateFlow(0)
val counter = _counter.asStateFlow()
fun inc() {
_counter.update { count -> count + 1 }
}
}
最近の言語の仕様は、オフィシャルドキュメントでは分かりずらいポリシーが多くあるように思います。
👉 【MVVM】 Kotlin Flow で使える5つの利用パターン
👉 StateFlow は distinctUtilChanged 不要
【Kotlin】SharedFlow と BroadcastChannel
昔々、Kotlin に導入されたコルーチンは軽量でした。多数のコルーチンを起動しながら、恐怖の「変更可能な状態の共有」問題を避けながらコルーチン間で通信する方法が必要でした。
そこで、コルーチン間通信プリミティブとして Channel が追加されました。Channel は素晴らしく、コルーチン間の、1対1、1対多、多対1、多対多の通信をサポートし Channel に送信された値はすべて1回だけ受信されます。
Channel を使用して、イベントや状態の更新を複数のサブスクライバーが独立しながら受信・反応できるように配信することはできません。
こうして BroadcastChannel が導入され、その実装として buffered と ConflatedBroadcastChannel が導入されました。これらは、しばらくの間は役に立っていましたが、設計的には行き詰っていることがわかりました。現在、kotlinx-coroutinesバージョン1.4から、より良い解決策である SharedFlow を導入しています。詳しくはこの記事をご覧ください。
■ Flow はシンプル
最初は Channel しかなかったので、非同期シーケンスのさまざまな変換をある Channel を引数にとり、結果として別の Channel を返す関数として実装していました。これは、フィルター演算子が独自のコルーチンで実行されることを意味します。
このような演算子の性能は、if 文を書いた場合と比較して、決して良いとは言えませんでした。後から考えてみると、これは驚くべきことではありません。なぜなら Channel は同期プリミティブだからです。どんな Channel でも、たとえプロデューサーとコンシューマーが1つずつの場合に最適化された実装であっても、同時に通信するコルーチンをサポートする必要があり、それらの間のデータ転送には同期が必要です。非同期データストリームの上にアプリケーションアーキテクチャを構築し始めると、当然、変換の必要性が出てきて Channel のコストが発生し始めます。
Kotlin Flow のシンプルな設計により、変換演算子の効率的な実装が可能です。基本的なケースでは、値の emit から collect までが同じコルーチン内で行われ、同期の必要はありません。
同期は、異なるコルーチンでの値の emit と collect が必要な場合にのみ Flow を使って導入します。
■ Flow はコールド
しかし Flow は一般的にコールドです flow { ... } ビルダー関数によって作成された Flow
val coldFlow = flow {
while (isActive) {
emit(nextEvent)
}
}
Flow 自体は、いかなる種類の計算にも裏付けられておらず、collect が始まるまで、それ自体ではいかなる状態も持ちません。すべての collect コルーチンは、emit コードの独自のインスタンスを実行します。
しかし、ユーザーのアクション、外部デバイスのイベント、状態の更新などはどのように処理するのでしょうか?これらは、それらに関心を持つコードがあるかどうかとは無関係に動作します。これらは、アプリケーション内の複数のオブザーバーをサポートする必要があります。これらはいわゆるイベントのホットソースです。
■ SharedFlow
そこで登場するのが、SharedFlow という概念です。SharedFlow は、collect されているかどうかに関わらず存在できます。SharedFlow の collector はサブスクライバーと呼ばれます。SharedFlow のすべてのサブスクライバーは、同じシーケンスの値を受け取ります。これは Channel のオーバーヘッドのほとんどを伴わない BroadcastChannel のように効果的に機能します。これにより、BroadcastChannel の概念が廃止されます。
本質的に SharedFlow は、軽量のブロードキャストイベントバスで、アプリケーションの中で作成して使用することができます。
class BroadcastEventBus {
private val _events = MutableSharedFlow<Event>()
val events = _events.asSharedFlow() // read-only public view
suspend fun postEvent(event: Event) {
_events.emit(event) // suspends until subscribers receive it
}
}
新しいサブスクライバーのために保持して再生する古いイベントの数 replay や、速いエミッタと遅いサブスクライバーにクッションを提供するための extraBufferCapacity など調整パラメータを持っています。
SharedFlow のすべてのサブスクライバーは、それぞれのコンテキストでイベントを非同期的に collect しています。エミッターは、サブスクライバーがイベントの処理を終えるまで待ちません。しかし、SharedFlow のバッファがいっぱいになると、エミッタはバッファに余裕ができるまでサスペンドします。バッファオーバフロー時のエミッタの停止は、コレクターが追いつかないときに emit を遅らせるためのバックプレッシャーとなります。バッファオーバフローに対処する別の方法は、BufferOverlow パラメータでサポートされています。
■ StateFlow
バッファオーバフローに対処するための一般的な方法は、最も古いイベントを削除し、最も新しい最新のイベントのみを保持することです。特に、アプリケーションの状態変数をモデル化するには最適な方法です。これは、廃止されたConflatedBroadcastChannel の代わりとなり、専用の StateFlow で広く使われているケースです。
class StateModel {
private val _state = MutableStateFlow(initial)
val state = _state.asStateFlow() // read-only public view
fun update(newValue: Value) {
_state.value = newValue // NOT suspending
}
}
val x: StateFlow
StateFlow では、複雑な Channel とシンプルな Flow の性能差がはっきりと現れます。StateFlow の実装では、アロケーションフリーの更新が行われますが、これは混信した BroadcastChannel ではそうではありませんでした。
■ Channel
さまざまな種類の SharedFlow がさまざまな種類の BroadcastChannel に取って代わるにつれ、人気のある質問は、普通の Channel はどうなるのかということです。多くの理由から、Channel は存続するでしょう。1つの理由は、Channel が多くの複雑Flow 演算子を実装するために使用される低レベルのプリミティブであることです。
しかし、Channel には応用的な使用例もあります。Channel は、正確に一度だけ処理しなければならないイベントを処理するために使用されます*(詳細は下記の注釈を参照)。これは、通常は1人の加入者がいるが、断続的に(起動時やある種の再構成時に)加入者が全くいないイベントタイプのデザインで、加入者が現れるまで投稿されたイベントをすべて保持しなければならないという要件がある場合に起こります。
class SingleShotEventBus {
private val _events = Channel<Event>()
val events = _events.receiveAsFlow() // expose as flow
suspend fun postEvent(event: Event) {
_events.send(event) // suspends on buffer overflow
}
}
最初の例の SharedFlow で書かれた BroadcastEventBus も、Channel で書かれたこの SingleShotEventBus も、イベントを Flow
SharedFlow では、イベントは未知数(ゼロ以上)のサブスクライバーにブロードキャストされます。サブスクライバーがいない場合、ポストされたイベントは直ちにドロップされます。これは、すぐに処理しなければならないイベントや、まったく処理しないイベントに使用するデザインパターンです。
Channel では、各イベントが1人の購読者に配信されます。購読者のいないイベントを投稿しようとすると、Channel のバッファがいっぱいになるとすぐに中断され、購読者が現れるのを待ちます。投稿されたイベントはドロップされません。
Channel を持つ SingleShotEventBus の実装では、キャンセルがない場合に限り、ポストされた各イベントを正確に1回だけ処理することに注意してください。Flow のサブスクライバーがキャンセルされると、イベントの配信に失敗することがあります。詳細については「Channel の未配信要素」を参照してください。
■ 結論
SharedFlow と Channel の違いを理解し、適切に使い分けましょう。どちらも便利で一緒に使うことを前提に設計されています。
しかし、BroadcastChannel は過去の遺物であり、将来的には非推奨となり削除されるでしょう。
👉 StateFlow の View への公開
👉 【MVVM】 Kotlin Flow で使える5つの利用パターン
👉 StateFlow は distinctUtilChanged 不要