【MVVM】 Kotlin Flow で使える5つの利用パターン

👉 【Kotlin】Flow の挙動やライフサイクルをログで確認する hatena-bookmark

以下、良記事の意訳です。

Migrating from LiveData to Kotlin’s Flow | Medium


Medium.com で表示

LiveData は、2017年に必要とされていました。オブザーバーパターンは私たちの生活を楽にしてくれましたが、RxJava などの選択肢は当時の初心者にとって複雑すぎました。Architecture Components チームは、LiveData (Android向けに設計された、非常にこだわりをもったオブザーバ型のデータホルダークラス) を作成しました。これは、簡単に始められるようにシンプルに作られており、より複雑なリアクティブストリームのケースでは、RxJava と統合して使用することが推奨されていました。

Java開発者、初心者、そしてシンプルな状況では、やはり LiveData が最適です。それ以外の人は、Kotlin Flow に移行するのが良いでしょう。Flow はまだ学習曲線が急ですが、Kotlin 言語の一部であり、Jetbrains がサポートしています。また、リアクティブモデルにぴったりの Compose も登場します。

以前から、View と ViewModel 以外のアプリの様々な部分をつなぐために Flow を使うことを話してきました。Android UIから Flow を collect する安全な方法ができたことで、完全な移行ガイドを作成することができます。

この記事では、View に Flow を公開する方法、Flow を collect する方法、そして特定のニーズに合わせて微調整する方法を学びます。

#1 : 可変データホルダーを使用したワンショット

これは典型的なパターンで、コルーチンの結果でステートホルダーを変異させるものです。

Expose the result of a one-shot operation with a Mutable data holder


class MyViewModel {
  private val _myUiState = MutableStateFlow<Result<UiState>>(Result.Loading)
  val myUiState: StateFlow<Result<UiState>> = _myUiState

  init {
    viewModelScope.launch {
      val result = ...
      _myUiState.value = result
    }
  }
}

StateFlow は、LiveData に最も近い SharedFlow です。

- 常に値を持っています。
- 値は1つしかありません。
- 複数のオブザーバーをサポートしています(それで Flow は shared)。
- アクティブなオブザーバーの数とは関係なく、サブスクリプション時に常に最新の値を返す。

UIの状態を View に公開するときは、StateFlow を使います。これは、UI状態を保持するために設計された安全で効率的なオブザーバーです。

#2 : ワンショット

これは、前のスニペットと同じで、可変型のバッキング・プロパティを持たずに、コルーチン呼び出しの結果を公開するものです。

ステートホルダーは常に値を持っているので、UIの状態を Loading、Success、Error などでサポートする Result クラスなどでラップすることは良いアイデアです。

Expose the result of a one-shot operation


class MyViewModel(...) : ViewModel() {
  val result: StateFlow<Result<UiState>> = flow {
    emit(repository.fetchItem())
  }.stateIn(
    scope = viewModelScope,
    started = WhileSubscribed(5000),
    initialValue = Result.Loading
  )
}

stateIn は、Flow を StateFlow に変換する Flow 演算子です。 詳細は後ほど説明します。

#3 : パラメータを使用したワンショットデータロード

例えば、ユーザーIDに依存するデータをロードしたい場合、フローを公開している AuthManager からこの情報を取得するとします。

One-shot data load with parameters


class MyViewModel(authManager..., repository...) : ViewModel() {
  private val userId: Flow<UserId> = authManager.observeUser().map { user -> user.id }

  val result: StateFlow<Result<Item>> = userId.mapLatest { newUserId ->
    repository.fetchItem(newUserId)
  }.stateIn(
    scope = viewModelScope,
    started = WhileSubscribed(5000),
    initialValue = Result.Loading
  )
}

もっと柔軟性が必要な場合は、transformLatest を使用して、アイテムを明示的に emit することもできますのでご注意ください。


val result = userId.transformLatest { newUserId ->
  emit(Result.LoadingData)
  emit(repository.fetchItem(newUserId))
}.stateIn(
  scope = viewModelScope,
  started = WhileSubscribed(5000),
  initialValue = Result.LoadingUser // Note the different Loading states
)

#4 : パラメータを使用してのデータのストリーム監視

では、この例をより反応性の高いものにしてみましょう。データはフェッチするのではなく、監視するので、データのソースの変更を自動的にUIに伝播させます。

例では、データソースで fetchItem を呼び出す代わりに、Flow を返す observeItem を使用します。

Observing a stream of data with parameters


class MyViewModel(authManager..., repository...) : ViewModel() {
  private val userId: Flow<String?> =
    authManager.observeUser().map { user -> user?.id }

  val result: StateFlow<Result<Item>> = userId.flatMapLatest { newUserId ->
    repository.observeItem(newUserId)
  }.stateIn(
    scope = viewModelScope,
    started = WhileSubscribed(5000),
    initialValue = Result.LoadingUser
  )
}

公開された StateFlow は、ユーザーが変更されたり、リポジトリ内のユーザーのデータが変更されたりするたびに更新を受け取ります。

#5 : 複数のソースを組み合わせる : MediatorLiveData -> Flow.combine

1つまたは複数の更新ソースを観察し、新しいデータを取得したときに何かを行うことができます。


val flow1: Flow<Int> = ...
val flow2: Flow<Int> = ...

val result = combine(flow1, flow2) { a, b -> a + b }

conbineTransform や zip 関数も同様に利用することができます。

■ 公開する StateFlow を設定する (stateIn 演算子)

最初に stateIn を使って通常の Flow を StateFlow に変換しましたが、これにはいくつかの設定が必要です。今すぐに詳細な設定をせず、コピペで済ませたい人には以下の組み合わせがおすすめです。


val result: StateFlow<Result<UiState>> = someFlow
  .stateIn(
     scope = viewModelScope,
     started = WhileSubscribed(5000),
     initialValue = Result.Loading
   )

しかし、その一見ランダムな5秒間の開始パラメータがよくわからないという方は、ぜひ読んでみてください。
stateIn には3つのパラメータがあります(docsより)。

@param scope 共有が開始されるコルーチンのスコープ。
@param started 共有の開始と停止を制御する戦略。
@param initialValue ステートフローの初期値。

この値は SharingStarted.WhileSubscribed で replayExpirationMillis パラメータを指定して StateFlow をリセットしたときにも使用されます。

started には3つの値があります。

Lazily : 最初のサブスクライバが現れたときに開始し、スコープがキャンセルされたときに停止します。
Eagerly : すぐに開始し、スコープがキャンセルされたら停止する。
WhileSubscribed : (複雑なので次で説明します。)

ワンショットの場合は Lazily か Eagerly を使います。しかし、Flow を監視している場合は WhileSubscribed を使って、以下に説明するような小さくても重要な最適化を行うべきです。

■ WhileSubscribed

WhileSubscribed は、collector がない場合に上流の Flow をキャンセルします。stateIn を使って作成された StateFlow は View にデータを公開していますが、同時に他のレイヤーや上流からの Flow も監視しています。これらの Flow をアクティブにしておくと、例えば、データベース接続やハードウェアセンサーなどの他のソースからデータを読み込み続ける場合、リソースの無駄遣いにつながる可能性があります。アプリがバックグラウンドに移行する際には、これらのコルーチンを停止する必要があります。

WhileSubscribed は2つのパラメータを取ります。


public fun WhileSubscribed(
  stopTimeoutMillis: Long = 0,
  replayExpirationMillis: Long = Long.MAX_VALUE
)

* Stop timeout

stopTimeoutMillis は、最後の加入者がいなくなってから、上流の Flow が停止するまでの遅延時間(ミリ秒)を設定します。デフォルトはゼロ(直ちに停止)です。

これは、View がほんの数秒リスニングを停止したときに、上流の Flow をキャンセルしたくない場合に便利です。このようなことはよくあります。例えば、ユーザーがデバイスを回転させたときに、View が destroy され、すぐに再作成されるような場合です。

liveData ビルダーでの解決策は、サブスクライバーが存在しない場合にコルーチンを停止する5秒の delay を追加することでした。WhileSubscribed(5000) はまさにそれを行います。


class MyViewModel(...) : ViewModel() {
  val result = userId.mapLatest { newUserId ->
    repository.observeItem(newUserId)
  }.stateIn(
    scope = viewModelScope,
    started = WhileSubscribed(5000),
    initialValue = Result.Loading
  )
}

この方法は、以下、すべての条件を満たしています。

- ユーザーがアプリをバックグラウンドに送ると、他のレイヤーからの更新が5秒後に停止し、バッテリーを節約できます。
- 最新の値はまだキャッシュされているので、ユーザーが戻ってきたときには、View にはすぐに何らかのデータが表示されることになります。
- サブスクリプションが再開されると、新しい値が入ってきて、利用可能な場合は画面が更新されます。

* Replay expiration

ユーザーが長時間離れているときに古いデータを表示したくなく、ローディング画面を表示したい場合は、WhileSubscribed の replayExpirationMillis パラメータをチェックしてください。キャッシュされた値が stateIn で定義された初期値に復元されるため、この状況では非常に便利で、メモリも節約できます。アプリに戻ってきたときの動作は遅くなりますが、古いデータは表示されません。

replayExpirationMillis - 共有コルーチンの停止とリプレイキャッシュのリセット( shareIn 演算子の場合はキャッシュを空にし、stateIn 演算子の場合はキャッシュされた値を元の初期値にリセットする)の間の遅延時間(ミリ秒)を設定します。デフォルトは Long.MAX_VALUE です(リプレイキャッシュを永遠に保持し、バッファをリセットしない)。ゼロの値を使用すると、キャッシュは直ちに失効します。

👉 【Kotlin】Flow の挙動やライフサイクルをログで確認する hatena-bookmark

■ View から StateFlow を監視する

これまで見てきたように、View が ViewModel の StateFlow に、もう監視していないことを知らせることは非常に重要です。しかし、ライフサイクルに関連するすべてのことがそうであるように、それはそれほど単純なことではありません。

Flow を collect するためにはコルーチンが必要ですが Activity や Fragment にはコルーチンビルダーがたくさんあります。

- Activity.lifecycleScope.launch:コルーチンを直ちに開始し、アクティビティが destroy されたときにそれをキャンセルする。
- Fragment.lifecycleScope.launch:コルーチンを直ちに開始し、フラグメントが destoroy されたときにキャンセルします。
- Fragment.viewLifecycleOwner
.lifecycleScope.launch:コルーチンを直ちに開始し、フラグメントの View ライフサイクルが destroy されたときにキャンセルします。UIを変更する場合は、ビューライフサイクルを使用する必要があります。

■ LaunchWhenStarted, launchWhenResumed…

特殊な launch である launchWhenX は、lifecycleOwner が X の状態になるまで待ち、lifecycleOwner が X の状態を下回ったときにコルーチンを中断します。注意すべき点は、ライフサイクル・オーナーが破壊されるまでコルーチンをキャンセルしないことです。

LaunchWhenStarted, launchWhenResumed…

アプリがバックグラウンドで動作しているときに更新情報を受信すると、クラッシュする可能性がありますが、これはビューでの collect を中断することで解決します。しかし、アプリがバックグラウンドで動作している間、上流の Flow はアクティブに保たれるため、リソースが無駄になる可能性があります。

つまり、これまで StateFlow を設定するために行ってきたことがすべて無駄になってしまうのです。しかし、これには新しい API があります。

■ lifecycle.repeatOnLifecycle を活用する

この新しいコルーチンビルダー(lifecycle-runtime-ktx 2.4.0-alpha01から入手可能)は、私たちが必要としていることを正確に実行します:コルーチンを特定の状態で開始し、ライフサイクルオーナーがそれ以下になると停止します。

lifecycle.repeatOnLifecycle to the rescue

たとえば、フラグメントの場合:


onCreateView(...) {
  viewLifecycleOwner.lifecycleScope.launch {
    viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
      myViewModel.myUiState.collect { ... }
    }
  }
}

これは、Fragment のビューが STARTED になったときに収集を開始し、RESUMED まで継続し、STOPPED に戻ったときに停止します。
詳しくは、A safer way to collect flow from Android UI をご覧ください。

repeatOnLifecycle APIと上記の StateFlow ガイダンスを混ぜ合わせることで、デバイスのリソースを有効活用しながら最高のパフォーマンスを得ることができます。

repeatOnLifecycle API

注意:
データバインディングに最近追加された StateFlow のサポートでは、更新情報の収集に launchWhenCreated を使用していますが、安定版では代わりにrepeatOnLifecycle を使用するようになります。

データバインディングでは、あらゆる場所で Flow を使用し asLiveData() を追加するだけで View に公開することができます。データバインディングは lifecycle-runtime-ktx 2.4.0 が安定した時点で更新される予定です。

■ まとめ

ViewModel からデータを公開し、View で監視する方法:

⭕ WhileSubscribed を使って、タイムアウト付きの StateFlow を公開します。
⭕ repeatOnLifecycle を使って collect します。

これ以外の組み合わせでは、上流側の Flow がアクティブになり、リソースが無駄になります。

❌ WhileSubscribed を使って公開し、lifecycleScope.launch/launchWhenX で collect する。
❌ Lazily/Eagerlyを使って公開し、repeatOnLifecycle で collect する。

もちろん、Flow のフルパワーを必要としない場合は LiveData を使えばいいのです。

👉 StateFlow は distinctUtilChanged 不要  
👉 StateFlow の View への公開 
👉 【MVVM】Flow vs LiveData 
👉 【Kotlin】SharedFlow と BroadcastChannel 



【Kotlin】SharedFlow と BroadcastChannel

👉 Shared flows, broadcast channels. See how shared flows made broadcast… | by Roman Elizarov | Medium 

 昔々、Kotlin に導入されたコルーチンは軽量でした。多数のコルーチンを起動しながら、恐怖の「変更可能な状態の共有」問題を避けながらコルーチン間で通信する方法が必要でした。

 そこで、コルーチン間通信プリミティブとして Channel が追加されました。Channel は素晴らしく、コルーチン間の、1対1、1対多、多対1、多対多の通信をサポートし Channel に送信された値はすべて1回だけ受信されます。

Diagram of many-to-many channel operation

 Channel を使用して、イベントや状態の更新を複数のサブスクライバーが独立しながら受信・反応できるように配信することはできません。

 こうして BroadcastChannel が導入され、その実装として buffered と ConflatedBroadcastChannel が導入されました。これらは、しばらくの間は役に立っていましたが、設計的には行き詰っていることがわかりました。現在、kotlinx-coroutinesバージョン1.4から、より良い解決策である SharedFlow を導入しています。詳しくはこの記事をご覧ください。

■ Flow はシンプル

 最初は Channel しかなかったので、非同期シーケンスのさまざまな変換をある Channel を引数にとり、結果として別の Channel を返す関数として実装していました。これは、フィルター演算子が独自のコルーチンで実行されることを意味します。

Diagram of filter operator with channels

 このような演算子の性能は、if 文を書いた場合と比較して、決して良いとは言えませんでした。後から考えてみると、これは驚くべきことではありません。なぜなら Channel は同期プリミティブだからです。どんな Channel でも、たとえプロデューサーとコンシューマーが1つずつの場合に最適化された実装であっても、同時に通信するコルーチンをサポートする必要があり、それらの間のデータ転送には同期が必要です。非同期データストリームの上にアプリケーションアーキテクチャを構築し始めると、当然、変換の必要性が出てきて Channel のコストが発生し始めます。

 Kotlin Flow のシンプルな設計により、変換演算子の効率的な実装が可能です。基本的なケースでは、値の emit から collect までが同じコルーチン内で行われ、同期の必要はありません。

Diagram of filter operator with flows

 同期は、異なるコルーチンでの値の emit と collect が必要な場合にのみ Flow を使って導入します。

■ Flow はコールド

 しかし Flow は一般的にコールドです flow { ... } ビルダー関数によって作成された Flow は、受動的な実体です。次のようなコードを考えてみましょう。


val coldFlow = flow {
    while (isActive) {
        emit(nextEvent)
    }
}

 Flow 自体は、いかなる種類の計算にも裏付けられておらず、collect が始まるまで、それ自体ではいかなる状態も持ちません。すべての collect コルーチンは、emit コードの独自のインスタンスを実行します。

Diagram of cold flow operation

 しかし、ユーザーのアクション、外部デバイスのイベント、状態の更新などはどのように処理するのでしょうか?これらは、それらに関心を持つコードがあるかどうかとは無関係に動作します。これらは、アプリケーション内の複数のオブザーバーをサポートする必要があります。これらはいわゆるイベントのホットソースです。

■ SharedFlow

 そこで登場するのが、SharedFlow という概念です。SharedFlow は、collect されているかどうかに関わらず存在できます。SharedFlow の collector はサブスクライバーと呼ばれます。SharedFlow のすべてのサブスクライバーは、同じシーケンスの値を受け取ります。これは Channel のオーバーヘッドのほとんどを伴わない BroadcastChannel のように効果的に機能します。これにより、BroadcastChannel の概念が廃止されます。

Diagram of shared flow operation

 本質的に SharedFlow は、軽量のブロードキャストイベントバスで、アプリケーションの中で作成して使用することができます。


class BroadcastEventBus {
    private val _events = MutableSharedFlow<Event>()
    val events = _events.asSharedFlow() // read-only public view

    suspend fun postEvent(event: Event) {
        _events.emit(event) // suspends until subscribers receive it
    }
}

 新しいサブスクライバーのために保持して再生する古いイベントの数 replay や、速いエミッタと遅いサブスクライバーにクッションを提供するための extraBufferCapacity など調整パラメータを持っています。

 SharedFlow のすべてのサブスクライバーは、それぞれのコンテキストでイベントを非同期的に collect しています。エミッターは、サブスクライバーがイベントの処理を終えるまで待ちません。しかし、SharedFlow のバッファがいっぱいになると、エミッタはバッファに余裕ができるまでサスペンドします。バッファオーバフロー時のエミッタの停止は、コレクターが追いつかないときに emit を遅らせるためのバックプレッシャーとなります。バッファオーバフローに対処する別の方法は、BufferOverlow パラメータでサポートされています。

■ StateFlow

 バッファオーバフローに対処するための一般的な方法は、最も古いイベントを削除し、最も新しい最新のイベントのみを保持することです。特に、アプリケーションの状態変数をモデル化するには最適な方法です。これは、廃止されたConflatedBroadcastChannel の代わりとなり、専用の StateFlow で広く使われているケースです。


class StateModel {
    private val _state = MutableStateFlow(initial)
    val state = _state.asStateFlow() // read-only public view

    fun update(newValue: Value) {
        _state.value = newValue // NOT suspending
    }
}

 val x: StateFlow は、var x: T の非同期でオブザーバブルなカウンターパートと考えてください。val x: StateFlow は、var x: T と同様に,常に最新の値を得ることができ、実際。最新の値が唯一の重要な値であるため、更新は中断することなく常に可能です.

 StateFlow では、複雑な Channel とシンプルな Flow の性能差がはっきりと現れます。StateFlow の実装では、アロケーションフリーの更新が行われますが、これは混信した BroadcastChannel ではそうではありませんでした。

■ Channel

 さまざまな種類の SharedFlow がさまざまな種類の BroadcastChannel に取って代わるにつれ、人気のある質問は、普通の Channel はどうなるのかということです。多くの理由から、Channel は存続するでしょう。1つの理由は、Channel が多くの複雑Flow 演算子を実装するために使用される低レベルのプリミティブであることです。
 しかし、Channel には応用的な使用例もあります。Channel は、正確に一度だけ処理しなければならないイベントを処理するために使用されます*(詳細は下記の注釈を参照)。これは、通常は1人の加入者がいるが、断続的に(起動時やある種の再構成時に)加入者が全くいないイベントタイプのデザインで、加入者が現れるまで投稿されたイベントをすべて保持しなければならないという要件がある場合に起こります。


class SingleShotEventBus {
    private val _events = Channel<Event>()
    val events = _events.receiveAsFlow() // expose as flow

    suspend fun postEvent(event: Event) {
        _events.send(event) // suspends on buffer overflow
    }
}

 最初の例の SharedFlow で書かれた BroadcastEventBus も、Channel で書かれたこの SingleShotEventBus も、イベントを Flow として公開していますが、重要な違いがあります。

SharedFlow では、イベントは未知数(ゼロ以上)のサブスクライバーにブロードキャストされます。サブスクライバーがいない場合、ポストされたイベントは直ちにドロップされます。これは、すぐに処理しなければならないイベントや、まったく処理しないイベントに使用するデザインパターンです。

 Channel では、各イベントが1人の購読者に配信されます。購読者のいないイベントを投稿しようとすると、Channel のバッファがいっぱいになるとすぐに中断され、購読者が現れるのを待ちます。投稿されたイベントはドロップされません。

 Channel を持つ SingleShotEventBus の実装では、キャンセルがない場合に限り、ポストされた各イベントを正確に1回だけ処理することに注意してください。Flow のサブスクライバーがキャンセルされると、イベントの配信に失敗することがあります。詳細については「Channel の未配信要素」を参照してください。

■ 結論

 SharedFlow と Channel の違いを理解し、適切に使い分けましょう。どちらも便利で一緒に使うことを前提に設計されています。
 しかし、BroadcastChannel は過去の遺物であり、将来的には非推奨となり削除されるでしょう。

👉 StateFlow の View への公開  
👉 【MVVM】 Kotlin Flow で使える5つの利用パターン 
👉 StateFlow は distinctUtilChanged 不要  


【YouTube】国土交通省 河川 ライブカメラ一覧

YouTube を利用したライブカメラ映像のライブ配信がここ1年で爆発的に整備されています。

江の川水系 江の川

かなり見やすくなりましたが、まだ、フォーマットにばらつきがあるようなので、一覧にしておきます。

 

北海道開発局

📺 hkd_mlitchannel
📺 国土交通省 北海道開発局 河川管理課

 

東北地方整備局

国土交通省 東北地方整備局
📺 【試験運用中】国土交通省福島河川国道事務所管内の河川カメラ ライブ映像 配信

 

関東地方整備局

国土交通省 関東地方整備局 広報チャンネル
📺 東京都水防チャンネル

 

北陸地方整備局

北陸地方整備局採用担当
📺 国土交通省 北陸地方整備局水災害対策センター

 

中部地方整備局

📺 国土交通省中部地方整備局

 

近畿地方整備局

国土交通省近畿地方整備局
📺 国土交通省 近畿地方整備局 河川部水災害予報センター

 

中国地方整備局

国土交通省中国地方整備局
📺 国土交通省中国地方整備局河川部

 

四国地方整備局

国土交通省四国地方整備局
📺 四国地方整備局 CCTV

 

九州地方整備局

国土交通省 九州地方整備局
📺 【試験配信中】 九州地方整備局

 

埋め込み不可な動画が多いので、各地域のライブ一覧的なページのリンクとなっています。

今後、未整備な地域は追加公開していくのだろうと思われます。