Kotlin コルーチン (coroutine) でやりたい「非同期直列」

いまや JSON な REST API を使うとき、よくある「非同期な連続するリクエスト」。

かんたんに書きたいですよね。

Kotlin で非同期といえばコルーチンですよね。

しかし「コルーチン」とカタカナで書くと気持ち悪いですね!

experiment で仕様もまだまだ更新しているようですが、自分なりに咀嚼してその瞬間で答えを持っておくことは大事なことと思っています。

なので、少しやってみました!

きっかけ

楽勝と思っていました。
すいません。

準備

実行しているスレッド名をわかりやすくログに表示させるようにしておきます。


private fun log(message: String = "") {
  println("[%s] %s".format(Thread.currentThread().name, message))
}

試していく

任意の Activityの中で実行します。

まずは、


log("START")
for (i in 1..5) {
  log("$i")
  Thread.sleep(1000)
}
log("END")


I/System.out: [main] START
I/System.out: [main] 1
I/System.out: [main] 2
I/System.out: [main] 3
I/System.out: [main] 4
I/System.out: [main] 5
24334-24334/com.example.helloworld I/System.out: [main] END

これを launch で非同期処理にします。


log("START")
launch {
  log("START")
  for (i in 1..5) {
    log("$i")
    delay(1000)
  }
  log("END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] START
I/System.out: [ForkJoinPool.commonPool-worker-1] 1
I/System.out: [ForkJoinPool.commonPool-worker-1] 2
I/System.out: [ForkJoinPool.commonPool-worker-1] 3
I/System.out: [ForkJoinPool.commonPool-worker-1] 4
I/System.out: [ForkJoinPool.commonPool-worker-1] 5
I/System.out: [ForkJoinPool.commonPool-worker-1] END

非同期処理には「キャンセル」処理は必須です。ユーザからのイベント以外にもAndroidアプリには必ずライフサイクルがつきまといます。


log("START")
val job = launch {
  log("START")
  for (i in 1..5) {
    log("$i")
    delay(1000)
  }
  log("END")
}
log("END")

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] START
I/System.out: [ForkJoinPool.commonPool-worker-1] 1
I/System.out: [ForkJoinPool.commonPool-worker-1] 2
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@26aa10a

並列で非同期処理。


log("START")
launch {
  log("LAUNCH - START")
  async {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }
  async {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }
  log("LAUNCH - END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - B - END

直列で非同期処理。


log("START")
launch {
  log("LAUNCH - START")
  async {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }.join()
  async {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }.join()
  log("LAUNCH - END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 4
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - END
I/System.out: [ForkJoinPool.commonPool-worker-3] LAUNCH - END

これに、先程のキャンセル処理を加えて、直列 + キャンセル処理 に。
coroutineContext を使って、親をキャンセルで子もキャンセルします。


log("START")
val job = launch {

  log("LAUNCH - START")

  async(coroutineContext) {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }.join()

  async(coroutineContext) {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }.join()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [main] Canceled!! StandaloneCoroutine{ Cancelled }@d4f0b6b

それぞれの非同期処理から戻り値を取ります。


// 直列 + 戻り値 + キャンセル
log("START")
val job = launch {

  log("LAUNCH - START")

  val taskA = async(coroutineContext) {
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  log("A : ${taskA.await()}")

  val taskB = async(coroutineContext) {
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  log("B : ${taskB.await()}")

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] A : 12345
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 2
I/System.out: [main] Canceled!! StandaloneCoroutine{ Cancelled }@5dc2361

戻り値はUIスレッドで受けたいですよね。


// 直列 + 戻り値 (-> UIスレッド) + キャンセル
log("START")
val job = launch(UI) { // @

  log("LAUNCH - START")

  val taskA = async(coroutineContext) { // @
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  val valueA = taskA.await()
  log("A : $valueA")
  Toast.makeText(this@MainActivity, "A : $valueA", Toast.LENGTH_SHORT).show()

  val taskB = async(coroutineContext) { // @
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  val valueB = taskB.await();
  log("B : $valueB")
  Toast.makeText(this@MainActivity, "B : $valueB", Toast.LENGTH_SHORT).show()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [main] LAUNCH - START
I/System.out: [main] ASYNC - A - START
I/System.out: [main] A - 1
I/System.out: [main] A - 2
I/System.out: [main] A - 3
I/System.out: [main] A - 4
I/System.out: [main] A - 5
I/System.out: [main] ASYNC - A - END
I/System.out: [main] A : 12345
I/System.out: [main] ASYNC - B - START
I/System.out: [main] B - 1
I/System.out: [main] B - 2
I/System.out: [main] B - 3
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@20ac62d

動き的にはここまででいいのかもしれませんが、非同期処理が [main] で実行されているのが気になります。

coroutine内では「+」を使ってコンテキストを override できるようです。


// 直列(バックグラウンド) + 戻り値 (-> UIスレッド) + キャンセル
log("START")
val job = launch(UI) {

  log("LAUNCH - START")

  val taskA = async(coroutineContext + CommonPool) { // @
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  val valueA = taskA.await()
  log("A : $valueA")
  Toast.makeText(this@MainActivity, "A : $valueA", Toast.LENGTH_SHORT).show()

  val taskB = async(coroutineContext + CommonPool) { // @
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  val valueB = taskB.await();
  log("B : $valueB")
  Toast.makeText(this@MainActivity, "B : $valueB", Toast.LENGTH_SHORT).show()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [main] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - A - END
I/System.out: [main] A : 12345
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@d8e310c

これで、一応、やりたかったことは達成できました。

知らんけど。

まとめ

実装を見据えて、Presenter向けにコピペテンプレートにしておきます。


private lateinit var job: Job

fun startPresenter() {
  job = loadData()
}

fun stopPresenter() {
  job.cancel()
}

fun loadData() = launch(UI) {

  view.showLoading()
  
  val task1 = async(coroutineContext + CommonPool) {
    provider.loadData1()
  }
  val result1 = task1.await()

  val task2 = async(coroutineContext + CommonPool) {
    provider.loadData2(result1)
  }
  val result2 = task2.await()

  view.showData(result2)
}

結局は、「コルーチンコンテキストの取扱い」だったことがよくわかります。

Kotlin で 非同期処理 Coroutine #1 ~ launch(), async()
Cancellation delivering design · Issue #114 · Kotlin/kotlinx.coroutines
Intrinsic suspend val coroutineContext : KT-17609


Kotlin で 非同期処理 Coroutine #1 ~ launch(), async()

ネット上を調べてみてもよくわかりません。

難しい言葉や experimental な仕様の変更などあったりして。

少しづつ試してみながらマスターしていきましょう。

// build.gradle

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:0.22.2"

//gradle.properties

kotlin.coroutines=enable

kotlinx.coroutines/coroutines-guide-ui.md at master · Kotlin/kotlinx.coroutines

まず、これ。


for (i in 1..10) {
  Timber.d("$i")
  Thread.sleep(1000)
}

非同期にしたいですよね。
launch から始めます。


launch { // @
  for (i in 1..10) {
    Timber.d("$i")
    Thread.sleep(1000)
  }
}


launch {
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)  // @
  }
}

引数をつけて渡す。

UI :
UIスレッドで実行。

CommonPool :
バックグランドスレッドで実行。


launch(UI) {  // @
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)
  }
}


launch(CommonPool) {  // @
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)
  }
}


launch(UI + CommonPool) {  // @
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)
  }
}

launch() の戻りからキャンセルできます。


val job = launch(UI) {  // @
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)
  }
}

fab.setOnClickListener {
  job.cancel() // @
}

 

まとめ

UIスレッドに限定されたコルーチンは、UIスレッドをブロックすることなく、UI内の何かを自由に更新して中断することができます。

delay が待っている間UIスレッドはブロックされないのでUIはフリーズしません。ただ単にコルーチンを中断します。

Job.cancelは完全にスレッドセーフでノンブロッキングです。
実際に終了するのを待つことなく、コルーチンがそのジョブをキャンセルするように通知するだけです。 どこからでも呼び出すことができます。

基本的な非同期呼び出しは、launch() と async() の2つ。似ているが戻りが異なる。


USB3.0 が WiFi 2.4Ghz に干渉しすぎて無駄にケーブルやアダプタが増えて金は減る

MacにUSB3.0ケーブルを差すとはっきりWiFiの接続が切れまくる。

もしや、と思い調べてみると。

USB3.0 機器から発生する高周波ノイズも、2.4Ghz無線LAN機器に大きく干渉し、良くない影響(通信速度の低下や通信断)が発生するようです。

USB3.0機器から発生するノイズは、2.4Ghz無線LANに悪影響を与えるらしい。 - ぼくんちのTV 別館

え、中華なケーブルやアダプタって買ってみて気づくこと多くない?

ネットのレビューやランキングって当てにならなくねえ?...

Apple 公式にも。

USB 3 デバイスがコンピュータに接続されていると、Wi-Fi または Bluetooth デバイスが正常に動作しないことがあります。なぜですか?

一部の USB 3 デバイスは、無線周波数干渉を引き起こすことがあります。これによって、2.4 GHz 帯を使用する Wi-Fi および Bluetooth デバイスとコンピュータとの通信に問題が生じる場合があります。

Mac で USB デバイスを使う - Apple サポート

この、WiFiの周波数の話は以前混線が気になっていたのが、もう4年前とか。

公衆無線LANとかWiFiが途切れる場合「5GHz帯」に切り替えたほうがいいんぢゃね?

今、同じアプリで計測してみると、2.4GHzはあいかわらず混雑で、5.0GHzも混んで来ている模様。

ルータを見てみると、今では旧型となり、2.4Ghz オンリー。

当然、飛んでいるWiFiのSSIDにも -g -gw のみ。


○○-a	IEEE802.11n	5GHz帯	        600Mbps
○○-aw	IEEE802.11a	5GHz帯	        54Mbps
○○-g	IEEE802.11n	2.4GHz帯	600Mbps
○○-gw	IEEE802.11g	2.4GHz帯	54Mbps

無線LANルーターのwifiのID内にある「g」「a」「gw」「aw」の意味 | JoyPlotライフ

買い替えますか、WiFiルータ。

【価格.com】無線LANルーター(Wi-Fiルーター) | 通販・価格比較・製品情報

最近のガジェットて使えるものなのに無駄に購入したりしてません?

もう2000円のルータを買おうと思います。 ← これが原因か! (笑)

👉 「これは分からないわ…」Mac miniで2.4GHz帯のWi-Fiが全く通信できないので調べてみたら1000円くらいで買った中華製のHDDケースに付いてたUSBケーブルが原因だった - Togetter 
👉 公衆無線LANとかWiFiが途切れる場合「5GHz帯」に切り替えたほうがいいんぢゃね?