Kotlin コルーチン (coroutine) でやりたい「非同期直列」

いまや JSON な REST API を使うとき、よくある「非同期な連続するリクエスト」。

かんたんに書きたいですよね。

Kotlin で非同期といえばコルーチンですよね。

しかし「コルーチン」とカタカナで書くと気持ち悪いですね!

experiment で仕様もまだまだ更新しているようですが、自分なりに咀嚼してその瞬間で答えを持っておくことは大事なことと思っています。

なので、少しやってみました!

きっかけ

楽勝と思っていました。
すいません。

準備

実行しているスレッド名をわかりやすくログに表示させるようにしておきます。


private fun log(message: String = "") {
  println("[%s] %s".format(Thread.currentThread().name, message))
}

試していく

任意の Activityの中で実行します。

まずは、


log("START")
for (i in 1..5) {
  log("$i")
  Thread.sleep(1000)
}
log("END")


I/System.out: [main] START
I/System.out: [main] 1
I/System.out: [main] 2
I/System.out: [main] 3
I/System.out: [main] 4
I/System.out: [main] 5
24334-24334/com.example.helloworld I/System.out: [main] END

これを launch で非同期処理にします。


log("START")
launch {
  log("START")
  for (i in 1..5) {
    log("$i")
    delay(1000)
  }
  log("END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] START
I/System.out: [ForkJoinPool.commonPool-worker-1] 1
I/System.out: [ForkJoinPool.commonPool-worker-1] 2
I/System.out: [ForkJoinPool.commonPool-worker-1] 3
I/System.out: [ForkJoinPool.commonPool-worker-1] 4
I/System.out: [ForkJoinPool.commonPool-worker-1] 5
I/System.out: [ForkJoinPool.commonPool-worker-1] END

非同期処理には「キャンセル」処理は必須です。ユーザからのイベント以外にもAndroidアプリには必ずライフサイクルがつきまといます。


log("START")
val job = launch {
  log("START")
  for (i in 1..5) {
    log("$i")
    delay(1000)
  }
  log("END")
}
log("END")

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] START
I/System.out: [ForkJoinPool.commonPool-worker-1] 1
I/System.out: [ForkJoinPool.commonPool-worker-1] 2
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@26aa10a

並列で非同期処理。


log("START")
launch {
  log("LAUNCH - START")
  async {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }
  async {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }
  log("LAUNCH - END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - B - END

直列で非同期処理。


log("START")
launch {
  log("LAUNCH - START")
  async {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }.join()
  async {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }.join()
  log("LAUNCH - END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 4
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - END
I/System.out: [ForkJoinPool.commonPool-worker-3] LAUNCH - END

これに、先程のキャンセル処理を加えて、直列 + キャンセル処理 に。
coroutineContext を使って、親をキャンセルで子もキャンセルします。


log("START")
val job = launch {

  log("LAUNCH - START")

  async(coroutineContext) {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }.join()

  async(coroutineContext) {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }.join()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [main] Canceled!! StandaloneCoroutine{ Cancelled }@d4f0b6b

それぞれの非同期処理から戻り値を取ります。


// 直列 + 戻り値 + キャンセル
log("START")
val job = launch {

  log("LAUNCH - START")

  val taskA = async(coroutineContext) {
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  log("A : ${taskA.await()}")

  val taskB = async(coroutineContext) {
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  log("B : ${taskB.await()}")

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] A : 12345
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 2
I/System.out: [main] Canceled!! StandaloneCoroutine{ Cancelled }@5dc2361

戻り値はUIスレッドで受けたいですよね。


// 直列 + 戻り値 (-> UIスレッド) + キャンセル
log("START")
val job = launch(UI) { // @

  log("LAUNCH - START")

  val taskA = async(coroutineContext) { // @
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  val valueA = taskA.await()
  log("A : $valueA")
  Toast.makeText(this@MainActivity, "A : $valueA", Toast.LENGTH_SHORT).show()

  val taskB = async(coroutineContext) { // @
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  val valueB = taskB.await();
  log("B : $valueB")
  Toast.makeText(this@MainActivity, "B : $valueB", Toast.LENGTH_SHORT).show()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [main] LAUNCH - START
I/System.out: [main] ASYNC - A - START
I/System.out: [main] A - 1
I/System.out: [main] A - 2
I/System.out: [main] A - 3
I/System.out: [main] A - 4
I/System.out: [main] A - 5
I/System.out: [main] ASYNC - A - END
I/System.out: [main] A : 12345
I/System.out: [main] ASYNC - B - START
I/System.out: [main] B - 1
I/System.out: [main] B - 2
I/System.out: [main] B - 3
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@20ac62d

動き的にはここまででいいのかもしれませんが、非同期処理が [main] で実行されているのが気になります。

coroutine内では「+」を使ってコンテキストを override できるようです。


// 直列(バックグラウンド) + 戻り値 (-> UIスレッド) + キャンセル
log("START")
val job = launch(UI) {

  log("LAUNCH - START")

  val taskA = async(coroutineContext + CommonPool) { // @
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  val valueA = taskA.await()
  log("A : $valueA")
  Toast.makeText(this@MainActivity, "A : $valueA", Toast.LENGTH_SHORT).show()

  val taskB = async(coroutineContext + CommonPool) { // @
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  val valueB = taskB.await();
  log("B : $valueB")
  Toast.makeText(this@MainActivity, "B : $valueB", Toast.LENGTH_SHORT).show()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [main] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - A - END
I/System.out: [main] A : 12345
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@d8e310c

これで、一応、やりたかったことは達成できました。

知らんけど。

まとめ

実装を見据えて、Presenter向けにコピペテンプレートにしておきます。


private lateinit var job: Job

fun startPresenter() {
  job = loadData()
}

fun stopPresenter() {
  job.cancel()
}

fun loadData() = launch(UI) {

  view.showLoading()
  
  val task1 = async(coroutineContext + CommonPool) {
    provider.loadData1()
  }
  val result1 = task1.await()

  val task2 = async(coroutineContext + CommonPool) {
    provider.loadData2(result1)
  }
  val result2 = task2.await()

  view.showData(result2)
}

結局は、「コルーチンコンテキストの取扱い」だったことがよくわかります。

Kotlin で 非同期処理 Coroutine #1 ~ launch(), async()
Cancellation delivering design · Issue #114 · Kotlin/kotlinx.coroutines
Intrinsic suspend val coroutineContext : KT-17609