Kotlin コルーチン (coroutine) でやりたい「非同期直列」

いまや JSON な REST API を使うとき、よくある「非同期な連続するリクエスト」。

かんたんに書きたいですよね。

Kotlin で非同期といえばコルーチンですよね。

しかし「コルーチン」とカタカナで書くと気持ち悪いですね!

experiment で仕様もまだまだ更新しているようですが、自分なりに咀嚼してその瞬間で答えを持っておくことは大事なことと思っています。

なので、少しやってみました!

きっかけ

楽勝と思っていました。
すいません。

準備

実行しているスレッド名をわかりやすくログに表示させるようにしておきます。


private fun log(message: String = "") {
  println("[%s] %s".format(Thread.currentThread().name, message))
}

試していく

任意の Activityの中で実行します。

まずは、


log("START")
for (i in 1..5) {
  log("$i")
  Thread.sleep(1000)
}
log("END")


I/System.out: [main] START
I/System.out: [main] 1
I/System.out: [main] 2
I/System.out: [main] 3
I/System.out: [main] 4
I/System.out: [main] 5
24334-24334/com.example.helloworld I/System.out: [main] END

これを launch で非同期処理にします。


log("START")
launch {
  log("START")
  for (i in 1..5) {
    log("$i")
    delay(1000)
  }
  log("END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] START
I/System.out: [ForkJoinPool.commonPool-worker-1] 1
I/System.out: [ForkJoinPool.commonPool-worker-1] 2
I/System.out: [ForkJoinPool.commonPool-worker-1] 3
I/System.out: [ForkJoinPool.commonPool-worker-1] 4
I/System.out: [ForkJoinPool.commonPool-worker-1] 5
I/System.out: [ForkJoinPool.commonPool-worker-1] END

非同期処理には「キャンセル」処理は必須です。ユーザからのイベント以外にもAndroidアプリには必ずライフサイクルがつきまといます。


log("START")
val job = launch {
  log("START")
  for (i in 1..5) {
    log("$i")
    delay(1000)
  }
  log("END")
}
log("END")

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] START
I/System.out: [ForkJoinPool.commonPool-worker-1] 1
I/System.out: [ForkJoinPool.commonPool-worker-1] 2
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@26aa10a

並列で非同期処理。


log("START")
launch {
  log("LAUNCH - START")
  async {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }
  async {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }
  log("LAUNCH - END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - B - END

直列で非同期処理。


log("START")
launch {
  log("LAUNCH - START")
  async {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }.join()
  async {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }.join()
  log("LAUNCH - END")
}
log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 4
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - END
I/System.out: [ForkJoinPool.commonPool-worker-3] LAUNCH - END

これに、先程のキャンセル処理を加えて、直列 + キャンセル処理 に。
coroutineContext を使って、親をキャンセルで子もキャンセルします。


log("START")
val job = launch {

  log("LAUNCH - START")

  async(coroutineContext) {
    log("ASYNC - A - START")
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
    }
    log("ASYNC - A - END")
  }.join()

  async(coroutineContext) {
    log("ASYNC - B - START")
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
    }
    log("ASYNC - B - END")
  }.join()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [main] Canceled!! StandaloneCoroutine{ Cancelled }@d4f0b6b

それぞれの非同期処理から戻り値を取ります。


// 直列 + 戻り値 + キャンセル
log("START")
val job = launch {

  log("LAUNCH - START")

  val taskA = async(coroutineContext) {
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  log("A : ${taskA.await()}")

  val taskB = async(coroutineContext) {
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  log("B : ${taskB.await()}")

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [ForkJoinPool.commonPool-worker-1] LAUNCH - START
I/System.out: [main] END
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-2] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - A - END
I/System.out: [ForkJoinPool.commonPool-worker-1] A : 12345
I/System.out: [ForkJoinPool.commonPool-worker-2] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-2] B - 2
I/System.out: [main] Canceled!! StandaloneCoroutine{ Cancelled }@5dc2361

戻り値はUIスレッドで受けたいですよね。


// 直列 + 戻り値 (-> UIスレッド) + キャンセル
log("START")
val job = launch(UI) { // @

  log("LAUNCH - START")

  val taskA = async(coroutineContext) { // @
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  val valueA = taskA.await()
  log("A : $valueA")
  Toast.makeText(this@MainActivity, "A : $valueA", Toast.LENGTH_SHORT).show()

  val taskB = async(coroutineContext) { // @
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  val valueB = taskB.await();
  log("B : $valueB")
  Toast.makeText(this@MainActivity, "B : $valueB", Toast.LENGTH_SHORT).show()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [main] LAUNCH - START
I/System.out: [main] ASYNC - A - START
I/System.out: [main] A - 1
I/System.out: [main] A - 2
I/System.out: [main] A - 3
I/System.out: [main] A - 4
I/System.out: [main] A - 5
I/System.out: [main] ASYNC - A - END
I/System.out: [main] A : 12345
I/System.out: [main] ASYNC - B - START
I/System.out: [main] B - 1
I/System.out: [main] B - 2
I/System.out: [main] B - 3
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@20ac62d

動き的にはここまででいいのかもしれませんが、非同期処理が [main] で実行されているのが気になります。

coroutine内では「+」を使ってコンテキストを override できるようです。


// 直列(バックグラウンド) + 戻り値 (-> UIスレッド) + キャンセル
log("START")
val job = launch(UI) {

  log("LAUNCH - START")

  val taskA = async(coroutineContext + CommonPool) { // @
    log("ASYNC - A - START")
    val sa = StringBuilder()
    for (i in 1..5) {
      log("A - $i")
      delay(1000)
      sa.append(i)
    }
    log("ASYNC - A - END")
    return@async sa.toString()
  }
  val valueA = taskA.await()
  log("A : $valueA")
  Toast.makeText(this@MainActivity, "A : $valueA", Toast.LENGTH_SHORT).show()

  val taskB = async(coroutineContext + CommonPool) { // @
    log("ASYNC - B - START")
    val sb = StringBuilder()
    for (i in 1..5) {
      log("B - $i")
      delay(1000)
      sb.append(i)
    }
    log("ASYNC - B - END")
    return@async sb.toString()
  }
  val valueB = taskB.await();
  log("B : $valueB")
  Toast.makeText(this@MainActivity, "B : $valueB", Toast.LENGTH_SHORT).show()

  log("LAUNCH - END")
}

fab.setOnClickListener {
  job.cancel()
  log("Canceled!! $job")
}

log("END")


I/System.out: [main] START
I/System.out: [main] END
I/System.out: [main] LAUNCH - START
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - A - START
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 3
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 4
I/System.out: [ForkJoinPool.commonPool-worker-1] A - 5
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - A - END
I/System.out: [main] A : 12345
I/System.out: [ForkJoinPool.commonPool-worker-1] ASYNC - B - START
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 1
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 2
I/System.out: [ForkJoinPool.commonPool-worker-1] B - 3
I/System.out: [main] Canceled!! StandaloneCoroutine{Cancelling}@d8e310c

これで、一応、やりたかったことは達成できました。

知らんけど。

まとめ

実装を見据えて、Presenter向けにコピペテンプレートにしておきます。


private lateinit var job: Job

fun startPresenter() {
  job = loadData()
}

fun stopPresenter() {
  job.cancel()
}

fun loadData() = launch(UI) {

  view.showLoading()
  
  val task1 = async(coroutineContext + CommonPool) {
    provider.loadData1()
  }
  val result1 = task1.await()

  val task2 = async(coroutineContext + CommonPool) {
    provider.loadData2(result1)
  }
  val result2 = task2.await()

  view.showData(result2)
}

結局は、「コルーチンコンテキストの取扱い」だったことがよくわかります。

Kotlin で 非同期処理 Coroutine #1 ~ launch(), async()
Cancellation delivering design · Issue #114 · Kotlin/kotlinx.coroutines
Intrinsic suspend val coroutineContext : KT-17609


Architecture Blueprints の非同期処理実装にみる Android SDK の方向性

MVP、MVVM、Clean Architecture、Dagger2、Data Binding、Archtecture Components などいろいろな組み合わせの実装例が ToDoアプリにて公開されています。

googlesamples/android-architecture: A collection of samples to discuss and showcase different architectural tools and patterns for Android apps.

非同期処理部分を見てみると現在はすべて(todo-mvp-rxjavaを除く)が以下の実装となり、非同期処理の主役であった AsyncTask/Loader API を利用した記述は消え去っています。

まず、java.util.concurrent.Executor(s) を使って、AppExecutors を作っておいて、


open class AppExecutors constructor(
    val diskIO: Executor = DiskIOThreadExecutor(),
    val networkIO: Executor = Executors.newFixedThreadPool(THREAD_COUNT),
    val mainThread: Executor = MainThreadExecutor()
) {

  private class MainThreadExecutor : Executor {

    private val mainThreadHandler = Handler(Looper.getMainLooper())

    override fun execute(command: Runnable) {
      mainThreadHandler.post(command)
    }
  }
}

AppExecutors.kt

それに対応するデータベースやストレージ向けのExecutorを作ります。


class DiskIOThreadExecutor : Executor {

  private val diskIO = Executors.newSingleThreadExecutor()

  override fun execute(command: Runnable) { diskIO.execute(command) }
}

DiskIOThreadExecutor.kt

これらを使って以下のようにして非同期処理を実装します。


appExecutors.diskIO.execute {

  // IOスレッドで実行する
  // ...

  appExecutors.mainThread.execute {

    // メインスレッドで実行する
    // ...

  }

}

実装例では、コールバックを使ってPresenterまで伝達しています。


override fun getTasks(callback: TasksDataSource.LoadTasksCallback) {

  appExecutors.diskIO.execute {

    // IOスレッドで実行する
    val tasks = tasksDao.getTasks()

    appExecutors.mainThread.execute {

      // メインスレッドで実行する
      if (tasks.isEmpty()) {
        callback.onDataNotAvailable()
      } else {
        callback.onTasksLoaded(tasks)
      }

    }

  }
}

TasksLocalDataSource.kt

AsyncTask/Loader APIs の排除の方向性は、「Deprecated(廃止予定) samples」に移動されたブランチからも認識できます。

googlesamples/android-architecture: A collection of samples to discuss and showcase different architectural tools and patterns for Android apps.

この流れについては、droidcon NYC 2017 - Android Architecture Round Table でも、話が挙がっていました。

it looks like Google is abandoning old API is like loaders and recommending patterns there are much less coupled with the framework, which is good but what happens with these API is are we abundant in then and I'm talking about classes like sync adapters loader async tasks etc

Google はLoaderのような古いAPI や フレームワークと関係の薄いパターンを推奨することをやめているように見えます。 それはいいことですが、それら古いAPIを捨てることは何を引き起こすか、AsyncAdapter や Loaderなどについて話したいと思います。

今後の、Android SDKは、フレームワークを意識したAPIが増えていくのでしょう。


すばやく理解する「Room x RxJava 」

いい記事があったので。

Room 🔗 RxJava – Google Developers – Medium

まずは、Room で Dao.


@Query(“SELECT * FROM Users WHERE id = :userId”)
User getUserById(String userId);

ここまでで問題なのは、

1. 同期呼び出しでブロッキング。
2. データ変更時に再度呼び出す必要がある。

ということで、RxJava を使いたくなります。

Room は RxJava2.x に対応しています。

Adding Components to your Project | Android Developers

どのように使うのか?

Maybe


@Query(“SELECT * FROM Users WHERE id = :userId”)
Maybe<User> getUserById(String userId);

1. 該当ユーザがなければ、何も返さずに complete。
2. 該当ユーザがあれば、onSuccess となり complete。
3. Maybe が complete されたあとにユーザー情報が更新されても何もしない。

Single


@Query(“SELECT * FROM Users WHERE id = :userId”)
Single<User> getUserById(String userId);

1. 該当ユーザがなければ、何も返さず onError(EmptyResultException)。
2. 該当ユーザがあれば、onSuccess。
3. Single が complete されたあとにユーザー情報が更新されても何もしない。

Flowable


@Query(“SELECT * FROM Users WHERE id = :userId”)
Flowable<User> getUserById(String userId);

1. 該当ユーザはなければ、何も返さず emit もされない。当然、onNext も onError も呼ばれない。
2. ユーザが存在すれば、onNext。
3. ユーザ情報が更新されるたびに、自動で emit されるので、UI上を最新データに更新させることが可能になる。

 

まとめ

これだけ数行でデータベース、非同期処理を簡潔明快に説明できる Room x RxJava の組み合わせ。

おまけに Observable から細分化された RxJava2.x の主役たちの使い方も理解することができます。

素晴らしいですよね。