【Kotlin】SharedFlow と BroadcastChannel

👉 Shared flows, broadcast channels. See how shared flows made broadcast… | by Roman Elizarov | Medium 

 昔々、Kotlin に導入されたコルーチンは軽量でした。多数のコルーチンを起動しながら、恐怖の「変更可能な状態の共有」問題を避けながらコルーチン間で通信する方法が必要でした。

 そこで、コルーチン間通信プリミティブとして Channel が追加されました。Channel は素晴らしく、コルーチン間の、1対1、1対多、多対1、多対多の通信をサポートし Channel に送信された値はすべて1回だけ受信されます。

Diagram of many-to-many channel operation

 Channel を使用して、イベントや状態の更新を複数のサブスクライバーが独立しながら受信・反応できるように配信することはできません。

 こうして BroadcastChannel が導入され、その実装として buffered と ConflatedBroadcastChannel が導入されました。これらは、しばらくの間は役に立っていましたが、設計的には行き詰っていることがわかりました。現在、kotlinx-coroutinesバージョン1.4から、より良い解決策である SharedFlow を導入しています。詳しくはこの記事をご覧ください。

■ Flow はシンプル

 最初は Channel しかなかったので、非同期シーケンスのさまざまな変換をある Channel を引数にとり、結果として別の Channel を返す関数として実装していました。これは、フィルター演算子が独自のコルーチンで実行されることを意味します。

Diagram of filter operator with channels

 このような演算子の性能は、if 文を書いた場合と比較して、決して良いとは言えませんでした。後から考えてみると、これは驚くべきことではありません。なぜなら Channel は同期プリミティブだからです。どんな Channel でも、たとえプロデューサーとコンシューマーが1つずつの場合に最適化された実装であっても、同時に通信するコルーチンをサポートする必要があり、それらの間のデータ転送には同期が必要です。非同期データストリームの上にアプリケーションアーキテクチャを構築し始めると、当然、変換の必要性が出てきて Channel のコストが発生し始めます。

 Kotlin Flow のシンプルな設計により、変換演算子の効率的な実装が可能です。基本的なケースでは、値の emit から collect までが同じコルーチン内で行われ、同期の必要はありません。

Diagram of filter operator with flows

 同期は、異なるコルーチンでの値の emit と collect が必要な場合にのみ Flow を使って導入します。

■ Flow はコールド

 しかし Flow は一般的にコールドです flow { ... } ビルダー関数によって作成された Flow は、受動的な実体です。次のようなコードを考えてみましょう。


val coldFlow = flow {
    while (isActive) {
        emit(nextEvent)
    }
}

 Flow 自体は、いかなる種類の計算にも裏付けられておらず、collect が始まるまで、それ自体ではいかなる状態も持ちません。すべての collect コルーチンは、emit コードの独自のインスタンスを実行します。

Diagram of cold flow operation

 しかし、ユーザーのアクション、外部デバイスのイベント、状態の更新などはどのように処理するのでしょうか?これらは、それらに関心を持つコードがあるかどうかとは無関係に動作します。これらは、アプリケーション内の複数のオブザーバーをサポートする必要があります。これらはいわゆるイベントのホットソースです。

■ SharedFlow

 そこで登場するのが、SharedFlow という概念です。SharedFlow は、collect されているかどうかに関わらず存在できます。SharedFlow の collector はサブスクライバーと呼ばれます。SharedFlow のすべてのサブスクライバーは、同じシーケンスの値を受け取ります。これは Channel のオーバーヘッドのほとんどを伴わない BroadcastChannel のように効果的に機能します。これにより、BroadcastChannel の概念が廃止されます。

Diagram of shared flow operation

 本質的に SharedFlow は、軽量のブロードキャストイベントバスで、アプリケーションの中で作成して使用することができます。


class BroadcastEventBus {
    private val _events = MutableSharedFlow<Event>()
    val events = _events.asSharedFlow() // read-only public view

    suspend fun postEvent(event: Event) {
        _events.emit(event) // suspends until subscribers receive it
    }
}

 新しいサブスクライバーのために保持して再生する古いイベントの数 replay や、速いエミッタと遅いサブスクライバーにクッションを提供するための extraBufferCapacity など調整パラメータを持っています。

 SharedFlow のすべてのサブスクライバーは、それぞれのコンテキストでイベントを非同期的に collect しています。エミッターは、サブスクライバーがイベントの処理を終えるまで待ちません。しかし、SharedFlow のバッファがいっぱいになると、エミッタはバッファに余裕ができるまでサスペンドします。バッファオーバフロー時のエミッタの停止は、コレクターが追いつかないときに emit を遅らせるためのバックプレッシャーとなります。バッファオーバフローに対処する別の方法は、BufferOverlow パラメータでサポートされています。

■ StateFlow

 バッファオーバフローに対処するための一般的な方法は、最も古いイベントを削除し、最も新しい最新のイベントのみを保持することです。特に、アプリケーションの状態変数をモデル化するには最適な方法です。これは、廃止されたConflatedBroadcastChannel の代わりとなり、専用の StateFlow で広く使われているケースです。


class StateModel {
    private val _state = MutableStateFlow(initial)
    val state = _state.asStateFlow() // read-only public view

    fun update(newValue: Value) {
        _state.value = newValue // NOT suspending
    }
}

 val x: StateFlow は、var x: T の非同期でオブザーバブルなカウンターパートと考えてください。val x: StateFlow は、var x: T と同様に,常に最新の値を得ることができ、実際。最新の値が唯一の重要な値であるため、更新は中断することなく常に可能です.

 StateFlow では、複雑な Channel とシンプルな Flow の性能差がはっきりと現れます。StateFlow の実装では、アロケーションフリーの更新が行われますが、これは混信した BroadcastChannel ではそうではありませんでした。

■ Channel

 さまざまな種類の SharedFlow がさまざまな種類の BroadcastChannel に取って代わるにつれ、人気のある質問は、普通の Channel はどうなるのかということです。多くの理由から、Channel は存続するでしょう。1つの理由は、Channel が多くの複雑Flow 演算子を実装するために使用される低レベルのプリミティブであることです。
 しかし、Channel には応用的な使用例もあります。Channel は、正確に一度だけ処理しなければならないイベントを処理するために使用されます*(詳細は下記の注釈を参照)。これは、通常は1人の加入者がいるが、断続的に(起動時やある種の再構成時に)加入者が全くいないイベントタイプのデザインで、加入者が現れるまで投稿されたイベントをすべて保持しなければならないという要件がある場合に起こります。


class SingleShotEventBus {
    private val _events = Channel<Event>()
    val events = _events.receiveAsFlow() // expose as flow

    suspend fun postEvent(event: Event) {
        _events.send(event) // suspends on buffer overflow
    }
}

 最初の例の SharedFlow で書かれた BroadcastEventBus も、Channel で書かれたこの SingleShotEventBus も、イベントを Flow として公開していますが、重要な違いがあります。

SharedFlow では、イベントは未知数(ゼロ以上)のサブスクライバーにブロードキャストされます。サブスクライバーがいない場合、ポストされたイベントは直ちにドロップされます。これは、すぐに処理しなければならないイベントや、まったく処理しないイベントに使用するデザインパターンです。

 Channel では、各イベントが1人の購読者に配信されます。購読者のいないイベントを投稿しようとすると、Channel のバッファがいっぱいになるとすぐに中断され、購読者が現れるのを待ちます。投稿されたイベントはドロップされません。

 Channel を持つ SingleShotEventBus の実装では、キャンセルがない場合に限り、ポストされた各イベントを正確に1回だけ処理することに注意してください。Flow のサブスクライバーがキャンセルされると、イベントの配信に失敗することがあります。詳細については「Channel の未配信要素」を参照してください。

■ 結論

 SharedFlow と Channel の違いを理解し、適切に使い分けましょう。どちらも便利で一緒に使うことを前提に設計されています。
 しかし、BroadcastChannel は過去の遺物であり、将来的には非推奨となり削除されるでしょう。

👉 StateFlow の View への公開  
👉 【MVVM】 Kotlin Flow で使える5つの利用パターン 
👉 StateFlow は distinctUtilChanged 不要  


【YouTube】国土交通省 河川 ライブカメラ一覧

YouTube を利用したライブカメラ映像のライブ配信がここ1年で爆発的に整備されています。

江の川水系 江の川

かなり見やすくなりましたが、まだ、フォーマットにばらつきがあるようなので、一覧にしておきます。

 

北海道開発局

📺 hkd_mlitchannel
📺 国土交通省 北海道開発局 河川管理課

 

東北地方整備局

国土交通省 東北地方整備局
📺 【試験運用中】国土交通省福島河川国道事務所管内の河川カメラ ライブ映像 配信

 

関東地方整備局

国土交通省 関東地方整備局 広報チャンネル
📺 東京都水防チャンネル

 

北陸地方整備局

北陸地方整備局採用担当
📺 国土交通省 北陸地方整備局水災害対策センター

 

中部地方整備局

📺 国土交通省中部地方整備局

 

近畿地方整備局

国土交通省近畿地方整備局
📺 国土交通省 近畿地方整備局 河川部水災害予報センター

 

中国地方整備局

国土交通省中国地方整備局
📺 国土交通省中国地方整備局河川部

 

四国地方整備局

国土交通省四国地方整備局
📺 四国地方整備局 CCTV

 

九州地方整備局

国土交通省 九州地方整備局
📺 【試験配信中】 九州地方整備局

 

埋め込み不可な動画が多いので、各地域のライブ一覧的なページのリンクとなっています。

今後、未整備な地域は追加公開していくのだろうと思われます。


コロナ ワクチン 予約・接種可能な施設をかんたんに探すアプリ

各行政のシステムでは自分の地域の全施設の状態をひと目で把握できませんね。

できるかもしれないが、もういいです。

簡単なアプリがありました。

smartnews
👉 SmartNews 

そうです、あの「スマートニュース」です。

起動後、「ワクチン」タブから。

vaccination-facitities

「港区」の状況を確認してみる。

covid19-vaccination

vaccination-facitities

かんたんに、ワクチン予約・接種可能な施設名が分かるので、あとは電話やWEBから予約をいれればよい。

このアプリからもできるかもしれんが、予約が空いてる施設だけ判明すればまあ事足りる。

以下、動画で詳細でもどうぞ。



👉 新型コロナワクチンの検索結果 - Yahoo!地図