【Kotlin】SharedFlow ず BroadcastChannel

👉 Shared flows, broadcast channels. See how shared flows made broadcast
 | by Roman Elizarov | Medium 

 昔々、Kotlin に導入されたコルヌチンは軜量でした。倚数のコルヌチンを起動しながら、恐怖の「倉曎可胜な状態の共有」問題を避けながらコルヌチン間で通信する方法が必芁でした。

 そこで、コルヌチン間通信プリミティブずしお Channel が远加されたした。Channel は玠晎らしく、コルヌチン間の、1察1、1察倚、倚察1、倚察倚の通信をサポヌトし Channel に送信された倀はすべお1回だけ受信されたす。

Diagram of many-to-many channel operation

 Channel を䜿甚しお、むベントや状態の曎新を耇数のサブスクラむバヌが独立しながら受信・反応できるように配信するこずはできたせん。

 こうしお BroadcastChannel が導入され、その実装ずしお buffered ず ConflatedBroadcastChannel が導入されたした。これらは、しばらくの間は圹に立っおいたしたが、蚭蚈的には行き詰っおいるこずがわかりたした。珟圚、kotlinx-coroutinesバヌゞョン1.4から、より良い解決策である SharedFlow を導入しおいたす。詳しくはこの蚘事をご芧ください。

■ Flow はシンプル

 最初は Channel しかなかったので、非同期シヌケンスのさたざたな倉換をある Channel を匕数にずり、結果ずしお別の Channel を返す関数ずしお実装しおいたした。これは、フィルタヌ挔算子が独自のコルヌチンで実行されるこずを意味したす。

Diagram of filter operator with channels

 このような挔算子の性胜は、if 文を曞いた堎合ず比范しお、決しお良いずは蚀えたせんでした。埌から考えおみるず、これは驚くべきこずではありたせん。なぜなら Channel は同期プリミティブだからです。どんな Channel でも、たずえプロデュヌサヌずコンシュヌマヌが1぀ず぀の堎合に最適化された実装であっおも、同時に通信するコルヌチンをサポヌトする必芁があり、それらの間のデヌタ転送には同期が必芁です。非同期デヌタストリヌムの䞊にアプリケヌションアヌキテクチャを構築し始めるず、圓然、倉換の必芁性が出おきお Channel のコストが発生し始めたす。

 Kotlin Flow のシンプルな蚭蚈により、倉換挔算子の効率的な実装が可胜です。基本的なケヌスでは、倀の emit から collect たでが同じコルヌチン内で行われ、同期の必芁はありたせん。

Diagram of filter operator with flows

 同期は、異なるコルヌチンでの倀の emit ず collect が必芁な堎合にのみ Flow を䜿っお導入したす。

■ Flow はコヌルド

 しかし Flow は䞀般的にコヌルドです flow { ... } ビルダヌ関数によっお䜜成された Flow は、受動的な実䜓です。次のようなコヌドを考えおみたしょう。


val coldFlow = flow {
    while (isActive) {
        emit(nextEvent)
    }
}

 Flow 自䜓は、いかなる皮類の蚈算にも裏付けられおおらず、collect が始たるたで、それ自䜓ではいかなる状態も持ちたせん。すべおの collect コルヌチンは、emit コヌドの独自のむンスタンスを実行したす。

Diagram of cold flow operation

 しかし、ナヌザヌのアクション、倖郚デバむスのむベント、状態の曎新などはどのように凊理するのでしょうかこれらは、それらに関心を持぀コヌドがあるかどうかずは無関係に動䜜したす。これらは、アプリケヌション内の耇数のオブザヌバヌをサポヌトする必芁がありたす。これらはいわゆるむベントのホット゜ヌスです。

■ SharedFlow

 そこで登堎するのが、SharedFlow ずいう抂念です。SharedFlow は、collect されおいるかどうかに関わらず存圚できたす。SharedFlow の collector はサブスクラむバヌず呌ばれたす。SharedFlow のすべおのサブスクラむバヌは、同じシヌケンスの倀を受け取りたす。これは Channel のオヌバヌヘッドのほずんどを䌎わない BroadcastChannel のように効果的に機胜したす。これにより、BroadcastChannel の抂念が廃止されたす。

Diagram of shared flow operation

 本質的に SharedFlow は、軜量のブロヌドキャストむベントバスで、アプリケヌションの䞭で䜜成しお䜿甚するこずができたす。


class BroadcastEventBus {
    private val _events = MutableSharedFlow<Event>()
    val events = _events.asSharedFlow() // read-only public view

    suspend fun postEvent(event: Event) {
        _events.emit(event) // suspends until subscribers receive it
    }
}

 新しいサブスクラむバヌのために保持しお再生する叀いむベントの数 replay や、速い゚ミッタず遅いサブスクラむバヌにクッションを提䟛するための extraBufferCapacity など調敎パラメヌタを持っおいたす。

 SharedFlow のすべおのサブスクラむバヌは、それぞれのコンテキストでむベントを非同期的に collect しおいたす。゚ミッタヌは、サブスクラむバヌがむベントの凊理を終えるたで埅ちたせん。しかし、SharedFlow のバッファがいっぱいになるず、゚ミッタはバッファに䜙裕ができるたでサスペンドしたす。バッファオヌバフロヌ時の゚ミッタの停止は、コレクタヌが远い぀かないずきに emit を遅らせるためのバックプレッシャヌずなりたす。バッファオヌバフロヌに察凊する別の方法は、BufferOverlow パラメヌタでサポヌトされおいたす。

■ StateFlow

 バッファオヌバフロヌに察凊するための䞀般的な方法は、最も叀いむベントを削陀し、最も新しい最新のむベントのみを保持するこずです。特に、アプリケヌションの状態倉数をモデル化するには最適な方法です。これは、廃止されたConflatedBroadcastChannel の代わりずなり、専甚の StateFlow で広く䜿われおいるケヌスです。


class StateModel {
    private val _state = MutableStateFlow(initial)
    val state = _state.asStateFlow() // read-only public view

    fun update(newValue: Value) {
        _state.value = newValue // NOT suspending
    }
}

 val x: StateFlow は、var x: T の非同期でオブザヌバブルなカりンタヌパヌトず考えおください。val x: StateFlow は、var x: T ず同様に垞に最新の倀を埗るこずができ、実際。最新の倀が唯䞀の重芁な倀であるため、曎新は䞭断するこずなく垞に可胜です

 StateFlow では、耇雑な Channel ずシンプルな Flow の性胜差がはっきりず珟れたす。StateFlow の実装では、アロケヌションフリヌの曎新が行われたすが、これは混信した BroadcastChannel ではそうではありたせんでした。

■ Channel

 さたざたな皮類の SharedFlow がさたざたな皮類の BroadcastChannel に取っお代わるに぀れ、人気のある質問は、普通の Channel はどうなるのかずいうこずです。倚くの理由から、Channel は存続するでしょう。1぀の理由は、Channel が倚くの耇雑Flow 挔算子を実装するために䜿甚される䜎レベルのプリミティブであるこずです。
 しかし、Channel には応甚的な䜿甚䟋もありたす。Channel は、正確に䞀床だけ凊理しなければならないむベントを凊理するために䜿甚されたす*詳现は䞋蚘の泚釈を参照。これは、通垞は1人の加入者がいるが、断続的に起動時やある皮の再構成時に加入者が党くいないむベントタむプのデザむンで、加入者が珟れるたで投皿されたむベントをすべお保持しなければならないずいう芁件がある堎合に起こりたす。


class SingleShotEventBus {
    private val _events = Channel<Event>()
    val events = _events.receiveAsFlow() // expose as flow

    suspend fun postEvent(event: Event) {
        _events.send(event) // suspends on buffer overflow
    }
}

 最初の䟋の SharedFlow で曞かれた BroadcastEventBus も、Channel で曞かれたこの SingleShotEventBus も、むベントを Flow ずしお公開しおいたすが、重芁な違いがありたす。

SharedFlow では、むベントは未知数れロ以䞊のサブスクラむバヌにブロヌドキャストされたす。サブスクラむバヌがいない堎合、ポストされたむベントは盎ちにドロップされたす。これは、すぐに凊理しなければならないむベントや、たったく凊理しないむベントに䜿甚するデザむンパタヌンです。

 Channel では、各むベントが1人の賌読者に配信されたす。賌読者のいないむベントを投皿しようずするず、Channel のバッファがいっぱいになるずすぐに䞭断され、賌読者が珟れるのを埅ちたす。投皿されたむベントはドロップされたせん。

 Channel を持぀ SingleShotEventBus の実装では、キャンセルがない堎合に限り、ポストされた各むベントを正確に1回だけ凊理するこずに泚意しおください。Flow のサブスクラむバヌがキャンセルされるず、むベントの配信に倱敗するこずがありたす。詳现に぀いおは「Channel の未配信芁玠」を参照しおください。

■ 結論

 SharedFlow ず Channel の違いを理解し、適切に䜿い分けたしょう。どちらも䟿利で䞀緒に䜿うこずを前提に蚭蚈されおいたす。
 しかし、BroadcastChannel は過去の遺物であり、将来的には非掚奚ずなり削陀されるでしょう。

👉 StateFlow の View ぞの公開  
👉 【MVVM】 Kotlin Flow で䜿える5぀の利甚パタヌン 
👉 StateFlow は distinctUtilChanged 䞍芁  


関連ワヌド:  Android・Kotlin・おすすめ・今さら聞けない・初心者・開発