【Kotlin】SharedFlow と BroadcastChannel

👉 Shared flows, broadcast channels. See how shared flows made broadcast… | by Roman Elizarov | Medium 

 昔々、Kotlin に導入されたコルーチンは軽量でした。多数のコルーチンを起動しながら、恐怖の「変更可能な状態の共有」問題を避けながらコルーチン間で通信する方法が必要でした。

 そこで、コルーチン間通信プリミティブとして Channel が追加されました。Channel は素晴らしく、コルーチン間の、1対1、1対多、多対1、多対多の通信をサポートし Channel に送信された値はすべて1回だけ受信されます。

Diagram of many-to-many channel operation

 Channel を使用して、イベントや状態の更新を複数のサブスクライバーが独立しながら受信・反応できるように配信することはできません。

 こうして BroadcastChannel が導入され、その実装として buffered と ConflatedBroadcastChannel が導入されました。これらは、しばらくの間は役に立っていましたが、設計的には行き詰っていることがわかりました。現在、kotlinx-coroutinesバージョン1.4から、より良い解決策である SharedFlow を導入しています。詳しくはこの記事をご覧ください。

■ Flow はシンプル

 最初は Channel しかなかったので、非同期シーケンスのさまざまな変換をある Channel を引数にとり、結果として別の Channel を返す関数として実装していました。これは、フィルター演算子が独自のコルーチンで実行されることを意味します。

Diagram of filter operator with channels

 このような演算子の性能は、if 文を書いた場合と比較して、決して良いとは言えませんでした。後から考えてみると、これは驚くべきことではありません。なぜなら Channel は同期プリミティブだからです。どんな Channel でも、たとえプロデューサーとコンシューマーが1つずつの場合に最適化された実装であっても、同時に通信するコルーチンをサポートする必要があり、それらの間のデータ転送には同期が必要です。非同期データストリームの上にアプリケーションアーキテクチャを構築し始めると、当然、変換の必要性が出てきて Channel のコストが発生し始めます。

 Kotlin Flow のシンプルな設計により、変換演算子の効率的な実装が可能です。基本的なケースでは、値の emit から collect までが同じコルーチン内で行われ、同期の必要はありません。

Diagram of filter operator with flows

 同期は、異なるコルーチンでの値の emit と collect が必要な場合にのみ Flow を使って導入します。

■ Flow はコールド

 しかし Flow は一般的にコールドです flow { ... } ビルダー関数によって作成された Flow は、受動的な実体です。次のようなコードを考えてみましょう。


val coldFlow = flow {
    while (isActive) {
        emit(nextEvent)
    }
}

 Flow 自体は、いかなる種類の計算にも裏付けられておらず、collect が始まるまで、それ自体ではいかなる状態も持ちません。すべての collect コルーチンは、emit コードの独自のインスタンスを実行します。

Diagram of cold flow operation

 しかし、ユーザーのアクション、外部デバイスのイベント、状態の更新などはどのように処理するのでしょうか?これらは、それらに関心を持つコードがあるかどうかとは無関係に動作します。これらは、アプリケーション内の複数のオブザーバーをサポートする必要があります。これらはいわゆるイベントのホットソースです。

■ SharedFlow

 そこで登場するのが、SharedFlow という概念です。SharedFlow は、collect されているかどうかに関わらず存在できます。SharedFlow の collector はサブスクライバーと呼ばれます。SharedFlow のすべてのサブスクライバーは、同じシーケンスの値を受け取ります。これは Channel のオーバーヘッドのほとんどを伴わない BroadcastChannel のように効果的に機能します。これにより、BroadcastChannel の概念が廃止されます。

Diagram of shared flow operation

 本質的に SharedFlow は、軽量のブロードキャストイベントバスで、アプリケーションの中で作成して使用することができます。


class BroadcastEventBus {
    private val _events = MutableSharedFlow<Event>()
    val events = _events.asSharedFlow() // read-only public view

    suspend fun postEvent(event: Event) {
        _events.emit(event) // suspends until subscribers receive it
    }
}

 新しいサブスクライバーのために保持して再生する古いイベントの数 replay や、速いエミッタと遅いサブスクライバーにクッションを提供するための extraBufferCapacity など調整パラメータを持っています。

 SharedFlow のすべてのサブスクライバーは、それぞれのコンテキストでイベントを非同期的に collect しています。エミッターは、サブスクライバーがイベントの処理を終えるまで待ちません。しかし、SharedFlow のバッファがいっぱいになると、エミッタはバッファに余裕ができるまでサスペンドします。バッファオーバフロー時のエミッタの停止は、コレクターが追いつかないときに emit を遅らせるためのバックプレッシャーとなります。バッファオーバフローに対処する別の方法は、BufferOverlow パラメータでサポートされています。

■ StateFlow

 バッファオーバフローに対処するための一般的な方法は、最も古いイベントを削除し、最も新しい最新のイベントのみを保持することです。特に、アプリケーションの状態変数をモデル化するには最適な方法です。これは、廃止されたConflatedBroadcastChannel の代わりとなり、専用の StateFlow で広く使われているケースです。


class StateModel {
    private val _state = MutableStateFlow(initial)
    val state = _state.asStateFlow() // read-only public view

    fun update(newValue: Value) {
        _state.value = newValue // NOT suspending
    }
}

 val x: StateFlow は、var x: T の非同期でオブザーバブルなカウンターパートと考えてください。val x: StateFlow は、var x: T と同様に,常に最新の値を得ることができ、実際。最新の値が唯一の重要な値であるため、更新は中断することなく常に可能です.

 StateFlow では、複雑な Channel とシンプルな Flow の性能差がはっきりと現れます。StateFlow の実装では、アロケーションフリーの更新が行われますが、これは混信した BroadcastChannel ではそうではありませんでした。

■ Channel

 さまざまな種類の SharedFlow がさまざまな種類の BroadcastChannel に取って代わるにつれ、人気のある質問は、普通の Channel はどうなるのかということです。多くの理由から、Channel は存続するでしょう。1つの理由は、Channel が多くの複雑Flow 演算子を実装するために使用される低レベルのプリミティブであることです。
 しかし、Channel には応用的な使用例もあります。Channel は、正確に一度だけ処理しなければならないイベントを処理するために使用されます*(詳細は下記の注釈を参照)。これは、通常は1人の加入者がいるが、断続的に(起動時やある種の再構成時に)加入者が全くいないイベントタイプのデザインで、加入者が現れるまで投稿されたイベントをすべて保持しなければならないという要件がある場合に起こります。


class SingleShotEventBus {
    private val _events = Channel<Event>()
    val events = _events.receiveAsFlow() // expose as flow

    suspend fun postEvent(event: Event) {
        _events.send(event) // suspends on buffer overflow
    }
}

 最初の例の SharedFlow で書かれた BroadcastEventBus も、Channel で書かれたこの SingleShotEventBus も、イベントを Flow として公開していますが、重要な違いがあります。

SharedFlow では、イベントは未知数(ゼロ以上)のサブスクライバーにブロードキャストされます。サブスクライバーがいない場合、ポストされたイベントは直ちにドロップされます。これは、すぐに処理しなければならないイベントや、まったく処理しないイベントに使用するデザインパターンです。

 Channel では、各イベントが1人の購読者に配信されます。購読者のいないイベントを投稿しようとすると、Channel のバッファがいっぱいになるとすぐに中断され、購読者が現れるのを待ちます。投稿されたイベントはドロップされません。

 Channel を持つ SingleShotEventBus の実装では、キャンセルがない場合に限り、ポストされた各イベントを正確に1回だけ処理することに注意してください。Flow のサブスクライバーがキャンセルされると、イベントの配信に失敗することがあります。詳細については「Channel の未配信要素」を参照してください。

■ 結論

 SharedFlow と Channel の違いを理解し、適切に使い分けましょう。どちらも便利で一緒に使うことを前提に設計されています。
 しかし、BroadcastChannel は過去の遺物であり、将来的には非推奨となり削除されるでしょう。

👉 StateFlow の View への公開  
👉 【MVVM】 Kotlin Flow で使える5つの利用パターン 
👉 StateFlow は distinctUtilChanged 不要  


Kotlin で Result

どう書くべきでしょうか、リポジトリが返す Result。

data クラス で書きますか?

それとも、Kotlin ビルトインのを使いますか?

👉 Result - Kotlin Programming Language 

 

Sealed クラス で書くべし

enum の拡張的なイメージで使いましょう、

👉 Sealed Classes - Kotlin Programming Language 

👉 architecture-samples/Result.kt

できるだけ長く広く便利に使えるものがいいですよね。

👉 【MVVM】Flow vs LiveData 


【MVVM】Flow vs LiveData

👉 Using LiveData & Flow in MVVM — Part I - ProAndroidDev 

Kotlin Flow の登場で盛り上がってきました。

どれにします? どの流れにします?

Repository

Result を返す。

Flow<Result>を返す。

ViewModel

Result を受けて、LiveData<Result> を渡す。

Flow<Result> を受けて、LiveData<Result> を渡す。


Fragment

LiveData<Result> を受け取る。

Flow<Result> を受け取る。


override fun onActivityCreated(savedInstanceState: Bundle?) {
  super.onActivityCreated(savedInstanceState)

  viewModel = ViewModelProviders.of(
      this,
      viewModelFactory
  ).get(WeatherForecastDataStreamFlowViewModel::class.java)

  // Consume data when fragment is started
  lifecycleScope.launchWhenStarted {

    // Since collect is a suspend function it needs to be called
    // from a coroutine scope
    viewModel.weatherForecast.collect {
      when (it) {
        Result.Loading -> {
          Toast.makeText(context, "Loading", Toast.LENGTH_SHORT).show()
        }
        is Result.Success -> {
          tvDegree.text = it.data.toString()
        }
        Result.Error -> {
          Toast.makeText(context, "Error", Toast.LENGTH_SHORT).show()
        }
      }
    }
  }
}

WeatherForecastDataStreamFlowFragment #L47-L75

まとめ

とはいえ、今はまだ、完全に LiveData は捨てれんよの。

👉 Kotlin で Result 
👉 Kotlin Flow vs Android LiveData - Stack Overflow 
👉 From RxJava 2 to Kotlin Flow: Threading - ProAndroidDev 

追記: ホットな Flow が登場したので以下。

👉 【MVVM】 Kotlin Flow で使える5つの利用パターン 


「Kotlinx Json」の登場でサードパーティJSONライブラリは不要となる。

JSONのライブラリ何を使ってますか?

👉 square/moshi: A modern JSON library for Kotlin and Java. 
👉 FasterXML/jackson: Main Portal page for the Jackson project 
👉 google/gson: A Java serialization/deserialization library to convert Java Objects into JSON and back 

これらは、Javaベースで書かれています。Kotlin は100%相互運用可能ですが微妙に期待しない挙動をします。


data class User(
    val name: String,
    val email: String,
    val age: Int = 13,
    val role: Role = Role.Viewer
)

enum class Role { Viewer, Editor, Owner }


{
    "name" : "John Doe",
    "email" : "[email protected]"
}


class JsonUnitTest {

    private val jsonString = """
            {
                "name" : "John Doe",
                "email" : "[email protected]"
            }
        """.trimIndent()

    @Test
    fun gsonTest() {
        val user = Gson().fromJson(jsonString, User::class.java)

        assertEquals("John Doe",user.name)
        assertEquals(null, user.role)
        assertEquals(0, user.age)

//      User(name=John Doe, 
//           [email protected], 
//           age=0, 
//           role=null)

    }

}

デフォルト値が期待通りにパースできません。

kotlinx.serialization が登場!!

JetBrains産です。間違いないでしょう。



👉 Kotlin/kotlinx.serialization: Kotlin multiplatform / multi-format serialization 

- クロスプラットフォーム
- 非リフレクション
- アノテーション @Serializable
- Kotlin v1.3.30+


@Serializable
data class User(
    val name: String,
    val email: String,
    val age: Int = 13,
    val role: Role = Role.Viewer
)

enum class Role { Viewer, Editor, Owner }

class JsonUnitTest {

    private val jsonString = """
            {
                "name" : "John Doe",
                "email" : "[email protected]"
            }
        """.trimIndent()

    @Test
    fun jsonTest() {
        val user = Json.parse(User.serializer(), jsonString)

        assertEquals("John Doe", user.name)
        assertEquals(Role.Viewer, user.role)
        assertEquals(13, user.age)

//      User(name=John Doe, 
//           [email protected], 
//           age=13, 
//           role=Viewer)

    }
}

Gson では無視されていたデフォルト値がきちんと使用されます。

以下のセットアップでどうぞ。


buildscript {
    ext.kotlin_version = '1.3.60'
    repositories { jcenter() }

    dependencies {
        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
        classpath "org.jetbrains.kotlin:kotlin-serialization:$kotlin_version"
    }
}


apply plugin: 'kotlin' 
apply plugin: 'kotlinx-serialization'


repositories {
    jcenter()
}

dependencies {
    implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
    implementation "org.jetbrains.kotlinx:kotlinx-serialization-runtime:0.14.0" // JVM dependency
}

👉 Kotlinx Json vs Gson - Juraj Kušnier - Medium 



今どきの「ネット接続してるかどうか」のコード。

ググると公式リファレンスが見つかる。


val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
val activeNetwork: NetworkInfo? = cm.activeNetworkInfo
val isConnected: Boolean = activeNetwork?.isConnectedOrConnecting == true

しかし、このコードはAPI29で非推奨。

Note: getActiveNetworkInfo() was deprecated in Android 10. Use NetworkCallbacks instead for apps that target Android 10 (API level 29) and higher.

👉 Monitor connectivity status and connection metering 

Android 10 で通信状態の変更を監視するには、 ConnectivityManager.registerNetworkCallback() を使いましょう。

👉 Android 10 時代の Connectivity Monitoring - takasfz blog 

- コールバックの登録/解除のタイミングはライフサイクルに依存する。
- コールバック onAvailable()/onLost() は、非メインスレッド上で受信。

よって、MVVM上で使うとなると、LiveData化するのが良さげ。


class ConnectivityLiveData @VisibleForTesting internal constructor(private val connectivityManager: ConnectivityManager)
    : LiveData<Boolean>() {

    @RequiresPermission(android.Manifest.permission.ACCESS_NETWORK_STATE)
    constructor(application: Application) : this(application.getSystemService(Context.CONNECTIVITY_SERVICE)
        as ConnectivityManager)

    private val networkCallback = object : ConnectivityManager.NetworkCallback() {
        override fun onAvailable(network: Network?) {
            postValue(true)
        }

        override fun onLost(network: Network?) {
            postValue(false)
        }
    }

    override fun onActive() {
        super.onActive()

        val activeNetwork: NetworkInfo? = connectivityManager.activeNetworkInfo
        postValue(activeNetwork?.isConnectedOrConnecting == true)

        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
            connectivityManager.registerDefaultNetworkCallback(networkCallback)
        } else {
            val builder = NetworkRequest.Builder()
            connectivityManager.registerNetworkCallback(builder.build(), networkCallback)
        }
    }

    override fun onInactive() {
        super.onInactive()
        connectivityManager.unregisterNetworkCallback(networkCallback)
    }
}

👉 ConnectivityLiveData - AndroidPub 

まとめ

ただ「ネットが使える/使えない」を判定させるだけの実装がここまでややこしいSDKてのは問題じゃねえか?!

みんな混乱してるようにみえるけども。

もっといい感じがあれば教えてね!