Kotlin コルーチン (coroutine) でやりたい「非同期直列」

いまや JSON な REST API を使うとき、よくある「非同期な連続するリクエスト」。

かんたんに書きたいですよね。

Kotlin で非同期といえばコルーチンですよね。

しかし「コルーチン」とカタカナで書くと気持ち悪いですね!

experiment で仕様もまだまだ更新しているようですが、自分なりに咀嚼してその瞬間で答えを持っておくことは大事なことと思っています。

なので、少しやってみました!

きっかけ

楽勝と思っていました。
すいません。

準備

実行しているスレッド名をわかりやすくログに表示させるようにしておきます。


private fun log(message: String = "") {
  println("[%s] %s".format(Thread.currentThread().name, message))
}

試していく

任意の Activityの中で実行します。

まずは、


log("START")
for (i in 1..5) {
  log("$i")
  Thread.sleep(1000)
}
log("END")


I/System.out: [main] START
I/System.out: [main] 1
I/System.out: [main] 2
I/System.out: [main] 3
I/System.out: [main] 4
I/System.out: [main] 5
24334-24334/com.example.helloworld I/System.out: [main] END

これを launch で非同期処理にします。


log("START")
launch {
  log("START")
  for (i in 1..5) {
    log("$i")
    delay(1000)
  }
  log("END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] START
I/System.out: [ForkJoinPool.commonPool-worker-1] 1
I/System.out: [ForkJoinPool.commonPool-worker-1] 2
I/System.out: [ForkJoinPool.commonPool-worker-1] 3
I/System.out: [ForkJoinPool.commonPool-worker-1] 4
I/System.out: [ForkJoinPool.commonPool-worker-1] 5
I/System.out: [ForkJoinPool.commonPool-worker-1] END

非同期処理には「キャンセル」処理は必須です。ユーザからのイベント以外にもAndroidアプリには必ずライフサイクルがつきまといます。


log("START")
val job = launch {
  log("START")
  for (i in 1..5) {
    log("$i")
    delay(1000)
  }
  log("END")
}
log("END")

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] START
I/System.out: [ForkJoinPool.commonPool-worker-1] 1
I/System.out: [ForkJoinPool.commonPool-worker-1] 2
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@26aa10a

並列で非同期処理。


log("START")
launch {
  log("LAUNCH - START")
  async {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }
  async {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }
  log("LAUNCH - END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - B - END

直列で非同期処理。


log("START")
launch {
  log("LAUNCH - START")
  async {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }.join()
  async {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }.join()
  log("LAUNCH - END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 4
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - END
I/System.out: [ForkJoinPool.commonPool-worker-3] LAUNCH - END

これに、先程のキャンセル処理を加えて、直列 + キャンセル処理 に。
coroutineContext を使って、親をキャンセルで子もキャンセルします。


log("START")
val job = launch {

  log("LAUNCH - START")

  async(coroutineContext) {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }.join()

  async(coroutineContext) {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }.join()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [main] Canceled!! StandaloneCoroutine{ Cancelled }@d4f0b6b

それぞれの非同期処理から戻り値を取ります。


// 直列 + 戻り値 + キャンセル
log("START")
val job = launch {

  log("LAUNCH - START")

  val taskA = async(coroutineContext) {
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  log("A : ${taskA.await()}")

  val taskB = async(coroutineContext) {
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  log("B : ${taskB.await()}")

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] A : 12345
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 2
I/System.out: [main] Canceled!! StandaloneCoroutine{ Cancelled }@5dc2361

戻り値はUIスレッドで受けたいですよね。


// 直列 + 戻り値 (-> UIスレッド) + キャンセル
log("START")
val job = launch(UI) { // @

  log("LAUNCH - START")

  val taskA = async(coroutineContext) { // @
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  val valueA = taskA.await()
  log("A : $valueA")
  Toast.makeText(this@MainActivity, "A : $valueA", Toast.LENGTH_SHORT).show()

  val taskB = async(coroutineContext) { // @
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  val valueB = taskB.await();
  log("B : $valueB")
  Toast.makeText(this@MainActivity, "B : $valueB", Toast.LENGTH_SHORT).show()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [main] LAUNCH - START
I/System.out: [main] ASYNC - A - START
I/System.out: [main] A - 1
I/System.out: [main] A - 2
I/System.out: [main] A - 3
I/System.out: [main] A - 4
I/System.out: [main] A - 5
I/System.out: [main] ASYNC - A - END
I/System.out: [main] A : 12345
I/System.out: [main] ASYNC - B - START
I/System.out: [main] B - 1
I/System.out: [main] B - 2
I/System.out: [main] B - 3
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@20ac62d

動き的にはここまででいいのかもしれませんが、非同期処理が [main] で実行されているのが気になります。

coroutine内では「+」を使ってコンテキストを override できるようです。


// 直列(バックグラウンド) + 戻り値 (-> UIスレッド) + キャンセル
log("START")
val job = launch(UI) {

  log("LAUNCH - START")

  val taskA = async(coroutineContext + CommonPool) { // @
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  val valueA = taskA.await()
  log("A : $valueA")
  Toast.makeText(this@MainActivity, "A : $valueA", Toast.LENGTH_SHORT).show()

  val taskB = async(coroutineContext + CommonPool) { // @
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  val valueB = taskB.await();
  log("B : $valueB")
  Toast.makeText(this@MainActivity, "B : $valueB", Toast.LENGTH_SHORT).show()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [main] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - A - END
I/System.out: [main] A : 12345
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@d8e310c

これで、一応、やりたかったことは達成できました。

知らんけど。

まとめ

実装を見据えて、Presenter向けにコピペテンプレートにしておきます。


private lateinit var job: Job

fun startPresenter() {
  job = loadData()
}

fun stopPresenter() {
  job.cancel()
}

fun loadData() = launch(UI) {

  view.showLoading()
  
  val task1 = async(coroutineContext + CommonPool) {
    provider.loadData1()
  }
  val result1 = task1.await()

  val task2 = async(coroutineContext + CommonPool) {
    provider.loadData2(result1)
  }
  val result2 = task2.await()

  view.showData(result2)
}

結局は、「コルーチンコンテキストの取扱い」だったことがよくわかります。

Kotlin で 非同期処理 Coroutine #1 ~ launch(), async()
Cancellation delivering design · Issue #114 · Kotlin/kotlinx.coroutines
Intrinsic suspend val coroutineContext : KT-17609


Kotlin で 非同期処理 Coroutine #1 ~ launch(), async()

ネット上を調べてみてもよくわかりません。

難しい言葉や experimental な仕様の変更などあったりして。

少しづつ試してみながらマスターしていきましょう。

// build.gradle

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:0.22.2"

//gradle.properties

kotlin.coroutines=enable

kotlinx.coroutines/coroutines-guide-ui.md at master · Kotlin/kotlinx.coroutines

まず、これ。


for (i in 1..10) {
  Timber.d("$i")
  Thread.sleep(1000)
}

非同期にしたいですよね。
launch から始めます。


launch { // @
  for (i in 1..10) {
    Timber.d("$i")
    Thread.sleep(1000)
  }
}


launch {
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)  // @
  }
}

引数をつけて渡す。

UI :
UIスレッドで実行。

CommonPool :
バックグランドスレッドで実行。


launch(UI) {  // @
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)
  }
}


launch(CommonPool) {  // @
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)
  }
}


launch(UI + CommonPool) {  // @
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)
  }
}

launch() の戻りからキャンセルできます。


val job = launch(UI) {  // @
  for (i in 1..10) {
    Timber.d("$i")
    delay(1000)
  }
}

fab.setOnClickListener {
  job.cancel() // @
}

 

まとめ

UIスレッドに限定されたコルーチンは、UIスレッドをブロックすることなく、UI内の何かを自由に更新して中断することができます。

delay が待っている間UIスレッドはブロックされないのでUIはフリーズしません。ただ単にコルーチンを中断します。

Job.cancelは完全にスレッドセーフでノンブロッキングです。
実際に終了するのを待つことなく、コルーチンがそのジョブをキャンセルするように通知するだけです。 どこからでも呼び出すことができます。

基本的な非同期呼び出しは、launch() と async() の2つ。似ているが戻りが異なる。


E を出力させない Double.toPlainString()

こういうの。助かります。


double d1 = 0.00000000000000000000000000000000000000000123456789012345678901234567890d;
double d2 = 1234567890123456789012345678900000000000000000000000000000000000000000000d;
double d3 = 1234567890.1234567890123456789d;

System.out.println(BigDecimal.valueOf(d1).toPlainString());
// 0.0000000000000000000000000000000000000000012345678901234568
System.out.println(BigDecimal.valueOf(d2).toPlainString());
// 1234567890123456800000000000000000000000000000000000000000000000000000000
System.out.println(BigDecimal.valueOf(d3).toPlainString());
// 1234567890.1234567

Java の double を素直な String にしたい、E 要らない - Qiita

直感的には表示できなく、ネット上を放浪していました。

Kotlin では、Extention Functions に追加しておきますわ。


fun Double.toPlainString(): String
    = BigDecimal.valueOf(this).toPlainString()


val d1 = 0.00000000000000000000000000000000000000000123456789012345678901234567890
val d2 = 1234567890123456789012345678900000000000000000000000000000000000000000000.0
val d3 = 1234567890.1234567890123456789
val d4 = 4.34e-05
val d5 = 5.6e-07

println(d1.toPlainString())
println(d2.toPlainString())
println(d3.toPlainString())
println(d4.toPlainString())
println(d5.toPlainString())


I/System.out: 0.0000000000000000000000000000000000000000012345678901234568
I/System.out: 1234567890123456800000000000000000000000000000000000000000000000000000000
I/System.out: 1234567890.1234567
I/System.out: 0.0000434
I/System.out: 0.00000056

Java Double に toPlainString() がないのは何か理由があるのかも。とは思うが。