【Flow】 shareIn() と stateIn()

意訳。

Flow.shareIn()

Flow.shareIn
コールドフローを、指定されたコルーチンスコープで開始されるホットな SharedFlow に変換し、上流側フローの単一の実行インスタンスからの emit を複数の下流側サブスクライバと共有し、指定された数の replay 値を新しいサブスクライバに再生します。SharedFlow の一般的な概念についてはドキュメントを参照してください。

共有コルーチンの開始は、started パラメータで制御され、以下のオプションがサポートされています。

- Eagerly
最初のサブスクライバが現れる前から上流のフローが開始されます。この場合、replay パラメータで指定された最新の値を超えて上流から emit された全ての値は直ちに破棄されますのでご注意下さい。

- Lazily
最初のサブスクライバが現れた後に上流のフローを開始します。この場合、最初のサブスクライバが emit されたすべての値を取得することが保証されますが、後続のサブスクライバは最新の replay 値のみを取得することが保証されます。すべてのサブスクライバがいなくなっても、上流のフローは継続してアクティブですが、サブスクライバがいない場合は、最新の replay 値のみがキャッシュされます。

- WhileSubscribed()
最初のサブスクライバが現れたときに上流のフローを開始し、最後のサブスクライバが消えたときに即座に停止し、リプレイ・キャッシュを永遠に維持します。WhileSubscribed() には、ドキュメントで説明されているように、追加のオプション設定パラメータがあります。

- SharingStarted
インターフェースを実装することで、カスタムストラテジーを提供することができます。

shareIn オペレータは、作成や維持にコストがかかるコールドフローがあり、その値を収集する複数のサブスクライバがある場合に便利です。

例えば、バックエンドから高コストなネットワーク接続を介してメッセージが送られてきて、その確立に多くの時間を要するフローを考えてみましょう。

概念的には次のように実装されます。


val backendMessages: Flow<Message> = flow {
  connectToBackend() // takes a lot of time
  try {
    while (true) {
      emit(receiveMessageFromBackend())
    }
  } finally {
    disconnectFromBackend()
  }
}

このフローをアプリケーションで直接使用する場合は、収集するたびに新しい接続が確立されるため、メッセージが流れ始めるまでに時間がかかります。しかし、次のように1つのコネクションを共有して、それを確立することができます。


val messages: SharedFlow<Message> = backendMessages.shareIn(scope, SharingStarted.Eagerly)

これで messages から1つのコネクションがすべてのコレクター間で共有され,必要なときにはコネクションを確立しておくことができます。

* 上流の完了とエラー処理

上流側フローの通常の完了は、サブスクライバには影響を与えず、共有コルーチンは継続して実行されます。SharingStarted.WhileSubscribed が使用されている場合は、上流側が再び再開されます。それの完了時に特別なアクションが必要な場合は、shareIn オペレータの前に onCompletion オペレータを使用して、以下のように特別な値を emit することができます。


backendMessages
  .onCompletion { cause -> if (cause == null) emit(UpstreamHasCompletedMessage) }
  .shareIn(scope, SharingStarted.Eagerly)

上流のフローで例外が発生した場合、どのサブスクライバにも影響を与えることなく共有コルーチンが終了し、それが起動したスコープで処理されます。shareIn オペレータの前に catch や retry オペレータを使用することで、カスタムの例外処理を設定することができます。例えば,IOException が発生したときに1秒の遅延で接続を再試行するには,次のようにします.


val messages = backendMessages
  .retry { e ->
    val shallRetry = e is IOException // 他の例外はバグ
    if (shallRetry) delay(1000)
    shallRetry
  }
  .shareIn(scope, SharingStarted.Eagerly)

* 初期値

上流がまだデータをロード中であることをサブスクライバに知らせるために、特別な初期値が必要な場合は、上流のフローに onStart オペレータを使用します。以下のようになります。


backendMessages
  .onStart { emit(UpstreamIsStartingMessage) }
  .shareIn(scope, SharingStarted.Eagerly, 1) // 最新のメッセージを1つ再生する

* buffer と conflate

shareIn オペレータは、別のコルーチンで上流のフローを実行し、buffer オペレータの説明にあるように、replay サイズまたはデフォルト (大きい方) のバッファを使用して、上流からの emit をバッファリングします。このデフォルトのバッファリングは,shareIn コールの前に buffer または conflate を付けることで,明示的なバッファ設定で上書きすることができます。

- buffer(0).shareIn(scope, started, 0) は、デフォルトのバッファサイズを上書きし、バッファのない SharedFlow を作成します。実際には、上流のエミッタとサブスクライバの間で順次処理が行われ、すべてのサブスクライバが値を処理するまでエミッタが停止するように設定されます。なお、サブスクライバがいない場合でも、値は直ちに破棄されます。

- buffer(b).shareIn(scope, started, r), replay = r, extraBufferCapacity = b の SharedFlow を作成します。

- conflate().shareIn(scope, started, r) が作成されます。

Flow.stateIn()

Flow.stateIn
コールドフローを、与えられたコルーチンスコープで開始される ホットな StateFlow に変換し、上流のフローの単一の実行インスタンスから最も新しく emit された値を複数の下流のサブスクライバと共有します。StateFlow の一般的な概念についてはドキュメントを参照してください。

共有コルーチンの開始は、shareIn オペレータのドキュメントで説明されているように、started パラメータによって制御されます。

stateIn オペレータは、ある状態の値の更新を提供するコールドフローがあり、作成や維持にコストがかかるが、最新の状態の値を収集する必要がある複数のサブスクライバがいる場合に便利です。例えば、バックエンドから高価なネットワーク接続を介して状態の更新が行われ、その確立に多くの時間がかかるフローを考えてみましょう。概念的には次のように実装されます。


val backendState: Flow<State> = flow {
  connectToBackend() // takes a lot of time
  try {
    while (true) {
      emit(receiveStateUpdateFromBackend())
    }
  } finally {
    disconnectFromBackend()
  }
}

このフローをアプリケーションで直接使用する場合、フローが収集されるたびに新しい接続が確立されるため、状態の更新が流れ始めるまでにしばらく時間がかかります。しかし、次のように1つのコネクションを共有し、それを熱心に確立することができます。


val state: StateFlow<State> = backendMessages.stateIn(scope, SharingStarted.Eagerly, State.LOADING)

これで、state から全てのコレクターの間で1つのコネクションが共有され、必要になった時には既にコネクションが確立されている可能性があります。

* 上流の完了とエラー処理

上流フローの正常な完了は、サブスクライバには影響を与えず、共有コルーチンは継続して実行されます。SharingStarted.WhileSubscribed が使用されている場合は、上流側が再び再開されます。その完了時に特別なアクションが必要な場合は、stateIn オペレータの前に onCompletion オペレータを使用して値を出力することができます。shareIn オペレータのドキュメントを参照してください。

上流のフローで例外が発生した場合、どのサブスクライバにも影響を与えることなく共有コルーチンが終了し、共有コルーチンが起動したスコープで処理されます。カスタム例外処理は、shareIn オペレータと同様に、stateIn の前に catch や retry オペレータを使用して設定できます。

👉 【MVVM】 Kotlin Flow で使える5つの利用パターン 
👉 【Kotlin】Flow の挙動やライフサイクルをログで確認する hatena-bookmark


ContentProvider を Flow 化する方法 - CashApp Cooper

cashapp/cooper


fun ContentResolver.observeQuery(
  uri: Uri,
  projection: Array<String>? = null,
  selection: String? = null,
  selectionArgs: Array<String>? = null,
  sortOrder: String? = null,
  notifyForDescendants: Boolean = false
): Flow<Query> {
  val query = ContentResolverQuery(this, uri, projection, selection, selectionArgs, sortOrder)
  return flow {
    emit(query)


    val channel = Channel<Unit>(CONFLATED)
    val observer = object : ContentObserver(mainThread) {
      override fun onChange(selfChange: Boolean) {
        channel.offer(Unit)
      }
    }


    registerContentObserver(uri, notifyForDescendants, observer)
    try {
      for (item in channel) {
        emit(query)
      }
    } finally {
      unregisterContentObserver(observer)
    }
  }
}

👉 FlowContentResolver.kt#L43-L90
👉 copper/FlowContentResolver.kt at trunk · cashapp/copper

Kotlin coroutines Flow や RxJava Observable を使ったリアクティブクエリ用の ContentProvider のラッパーです。

使用方法


implementation 'app.cash.copper:copper-flow:1.0.0'

ContentResolver で query() を observeQuery() に変更することで、リアクティブ版を実現します。


contentResolver.observeQuery(uri).collect { query ->
  query.run()?.use { cursor ->
    // ...
  }
}

query() とは異なり、observeQuery() は Query オブジェクトを返します。このオブジェクトは、カーソルの基礎となるクエリを実行するために run() を呼び出す必要があります。これにより、値をキャッシュする中間オペレータがリソースをリークすることなく、コンシューマーがカーソルのライフタイム全体にアクセスできるようになります。

cursor を直接処理する代わりに、含まれる値をセマンティックタイプに変換するためのオペレータを提供しています。


contentResolver.observeQuery(uri)
  .mapToOne { cursor ->
    Employee(cursor.getString(0), cursor.getString(1))
  }
  .collect {
    println(it)
  }


Employee(id=bob, name=Bob Bobberson)

mapToOne オペレータは、1 つの行を返すクエリを受け取り、ラムダを起動してカーソルを希望の型にマッピングします。クエリがゼロまたは1行を返す場合は、コルーチン成果物には mapToOneOrNull オペレータがあり、RxJava成果物には mapToOptional 演算子があります。

クエリがリストを返す場合は、同じラムダでmapToListを呼び出します。


contentResolver.observeQuery(uri)
  .mapToList { cursor ->
    Employee(cursor.getString(0), cursor.getString(1))
  }
  .collect {
    println(it)
  }


[Employee(id=alice, name=Alice Alison), Employee(id=bob, name=Bob Bobberson)]

安定の神Jake産です。

👉 【SQLDelight 】Query を Flow 化するプラグイン 


【SQLDelight 】Query を Flow 化するプラグイン

To consume a query as a Flow, depend on the Coroutines extensions artifact and use the extension method

Room + LiveData と同様に SQLDelight でも簡単にFlow化できます。


val players: Flow<List<HockeyPlayer>> =
  playerQueries.selectAll()
    .asFlow()
    .mapToList()

👉 Coroutines - SQLDelight

当然、クエリーの結果の変化を検知して emit します。


@JvmName("toFlow")
fun <T : Any> Query<T>.asFlow(): Flow<Query<T>> = flow {
  val channel = Channel<Unit>(CONFLATED)
  channel.trySend(Unit)

  val listener = object : Query.Listener {
    override fun queryResultsChanged() {
      channel.trySend(Unit)
    }
  }

  addListener(listener)
  try {
    for (item in channel) {
      emit(this@asFlow)
    }
  } finally {
    removeListener(listener)
  }
}

👉 FlowExtensions.kt#L35-L54

使えます。

👉 SQLDelight で View を使うべし 
👉 Turbine で Kotlin coroutine Flow をテストする | #android ファショ通