ContentProvider を Flow 化する方法 - CashApp Cooper

cashapp/cooper


fun ContentResolver.observeQuery(
  uri: Uri,
  projection: Array<String>? = null,
  selection: String? = null,
  selectionArgs: Array<String>? = null,
  sortOrder: String? = null,
  notifyForDescendants: Boolean = false
): Flow<Query> {
  val query = ContentResolverQuery(this, uri, projection, selection, selectionArgs, sortOrder)
  return flow {
    emit(query)


    val channel = Channel<Unit>(CONFLATED)
    val observer = object : ContentObserver(mainThread) {
      override fun onChange(selfChange: Boolean) {
        channel.offer(Unit)
      }
    }


    registerContentObserver(uri, notifyForDescendants, observer)
    try {
      for (item in channel) {
        emit(query)
      }
    } finally {
      unregisterContentObserver(observer)
    }
  }
}

👉 FlowContentResolver.kt#L43-L90
👉 copper/FlowContentResolver.kt at trunk · cashapp/copper

Kotlin coroutines Flow や RxJava Observable を使ったリアクティブクエリ用の ContentProvider のラッパーです。

使用方法


implementation 'app.cash.copper:copper-flow:1.0.0'

ContentResolver で query() を observeQuery() に変更することで、リアクティブ版を実現します。


contentResolver.observeQuery(uri).collect { query ->
  query.run()?.use { cursor ->
    // ...
  }
}

query() とは異なり、observeQuery() は Query オブジェクトを返します。このオブジェクトは、カーソルの基礎となるクエリを実行するために run() を呼び出す必要があります。これにより、値をキャッシュする中間オペレータがリソースをリークすることなく、コンシューマーがカーソルのライフタイム全体にアクセスできるようになります。

cursor を直接処理する代わりに、含まれる値をセマンティックタイプに変換するためのオペレータを提供しています。


contentResolver.observeQuery(uri)
  .mapToOne { cursor ->
    Employee(cursor.getString(0), cursor.getString(1))
  }
  .collect {
    println(it)
  }


Employee(id=bob, name=Bob Bobberson)

mapToOne オペレータは、1 つの行を返すクエリを受け取り、ラムダを起動してカーソルを希望の型にマッピングします。クエリがゼロまたは1行を返す場合は、コルーチン成果物には mapToOneOrNull オペレータがあり、RxJava成果物には mapToOptional 演算子があります。

クエリがリストを返す場合は、同じラムダでmapToListを呼び出します。


contentResolver.observeQuery(uri)
  .mapToList { cursor ->
    Employee(cursor.getString(0), cursor.getString(1))
  }
  .collect {
    println(it)
  }


[Employee(id=alice, name=Alice Alison), Employee(id=bob, name=Bob Bobberson)]

安定の神Jake産です。

👉 【SQLDelight 】Query を Flow 化するプラグイン 


【SQLDelight 】Query を Flow 化するプラグイン

To consume a query as a Flow, depend on the Coroutines extensions artifact and use the extension method

Room + LiveData と同様に SQLDelight でも簡単にFlow化できます。


val players: Flow<List<HockeyPlayer>> =
  playerQueries.selectAll()
    .asFlow()
    .mapToList()

👉 Coroutines - SQLDelight

当然、クエリーの結果の変化を検知して emit します。


@JvmName("toFlow")
fun <T : Any> Query<T>.asFlow(): Flow<Query<T>> = flow {
  val channel = Channel<Unit>(CONFLATED)
  channel.trySend(Unit)

  val listener = object : Query.Listener {
    override fun queryResultsChanged() {
      channel.trySend(Unit)
    }
  }

  addListener(listener)
  try {
    for (item in channel) {
      emit(this@asFlow)
    }
  } finally {
    removeListener(listener)
  }
}

👉 FlowExtensions.kt#L35-L54

使えます。

👉 SQLDelight で View を使うべし 
👉 Turbine で Kotlin coroutine Flow をテストする | #android ファショ通 


【Kotlin】Flow flatMap* を ネストするか チェインするか【coroutine】

kotlin coroutin flow のオペレータをネストするかチェインするかの話です。

👉 Kotlin flow: Nesting vs Chaining • Vasya Drobushkov 

Flow 間のデータ受け渡し


observeUser()
  .flatMap { user ->
    api.load(user.id)
      .flatMapLatest { data -> api.send(user.id, data) }
  }
  .collect()


observeUser()
  .flatMap { user ->
    api.load(user.id)
  }
  .flatMap { data -> api.send(user.id, data) } // ! user is not accessible
  .collect()

An important observation is that nesting unlike chaining creates scope. And one of the simplest things one can do with the scope is to share some data inside it.

重要なことは、チェインとは異なり、ネストによってスコープが作成されることです。そして、スコープで実行できる最も簡単なことの1つは、スコープ内のデータを共有することです。

キャンセルの伝達


observeUser()
  .flatMapLatest { user ->
    api.load(user.id)
      .flatMapLatest { observeLocation() }
  }
  .collect()


observeUser()
  .flatMapLatest { user ->
    api.load(user.id)
  }
  .flatMapLatest { observeLocation() }
  .collect()

Here we again used nesting, while we don’t need to pass any data to the observeLocation stream. Additionally, instead of flatMap we’ve used flatMapLatest (in RxJava it is called switchMap) - if the new value will be sent by upstream the downstream will be canceled and a new one created. This ensures that if the user was changed (e.g. account switched) we’ll trigger the server once again to determine whether we need to observe location.

observeLocation ストリームには何のデータも渡す必要はありません。RxJavaではswitchMapと呼ばれる flatMapLatest を使用しています。新しい値がアップストリームで送信されると、ダウンストリームはキャンセルされ、新しい値が作成されます。これにより、ユーザーが変更された場合(例えば、アカウントが変更された場合)、位置情報をobserveする必要があるかどうかを判断するために、もう一度サーバーを起動することができます。

because in the case with nesting we’ve defined the scope that has lifecycle attached to the observeUser stream: when the user is changed - everything inside flatMapLatest will be canceled. And in the case of chaining, we have observeLocation outside of user scope - so when the user changed, the location stream is not canceled.

ネスティングの場合は、observUserストリームにライフサイクルが付随するスコープを定義しているため、ユーザーが変更されると、flatMapLatest内のすべてがキャンセルされます。また、チェイニングの場合は、ユーザースコープの外側にobserveLocationを設定していますので、ユーザーが変更されてもlocationストリームはキャンセルされません。

まとめ

flatMapLatest を使う場合は、入れ子のほうが意図に沿いやすいように思えるが、コード自体の見通しは悪い。

頭のどこかに「ネストかチェインか」は置いておくべきでしょう。

👉 【MVVM】 Kotlin Flow で使える5つの利用パターン | #android ファショ通