GolangにはGoroutineという良いやつがある。非同期処理をいい感じにやってくれる賢い軽量スレッドだよ。
そして、Goroutine同士はchannelという概念を通じて会話することができる。
他方、Scalaの非同期処理をいい感じにやるライブラリであるCats Effectにも似たことをする機能がある。
Deferred[A]
は、一回だけ入力することができるchannelのようなもの。
IO.defered[A]
-- deferredを生成する。deferred.complete(a: A)
-- deferredに値を渡して完了させる。deferred.get
-- deferredから値を取得しようと試みる。その間論理的にブロックする。- 論理的にブロック: 実際にスレッドがブロックするのではなく、Cats Effectランタイムはスレッドを他の処理に使い回す。
- 一度completeしたDeferredからは何度でもgetできる
例えば、このように使う:
trait LikeGoroutine { import scala.concurrent.duration._ import scala.language.postfixOps import cats.effect.Deferred def runGoroutine = for { _ <- showThreadInfo *> IO.println("Running three Fibers") chan <- IO.deferred[Int] _ <- (triggerAfterThreeSeconds(chan), waitForChan(chan), waitForChan(chan)) parTupled } yield () def triggerAfterThreeSeconds(chan: Deferred[IO, Int]): IO[Unit] = { for { _ <- showThreadInfo *> IO.println("trigger Deferred after 3 seconds...") _ <- IO.sleep(3 seconds) _ <- showThreadInfo *> IO.println("triggering") _ <- chan.complete(42) } yield () } def waitForChan(chan: Deferred[IO, Int]): IO[Unit] = { for { _ <- showThreadInfo *> IO.println("waiting for Deferred...") n <- chan.get _ <- showThreadInfo *> IO.println(s"got value $n !!") } yield () } def showThreadInfo: IO[Unit] = IO.print(s"[${Thread.currentThread().getName()}] ") }
runGoroutine
を実行すると、出力は以下のようになる:
Available #CPU: 12 [io-compute-5] Running three Fibers [io-compute-5] trigger Deferred after 3 seconds... [io-compute-5] waiting for Deferred... [io-compute-5] waiting for Deferred... [io-compute-12] triggering [io-compute-7] got value 42 !! [io-compute-15] got value 42 !!
runGoroutine
の中では、chan
という名前でDeferred[Int]
を作成している。その後、triggerAfterThreeSeconds
と、2つのwaitForChan
とをparTupled
によって並行実行している。
triggerAfterThreeSeconds
がその名の通り3秒後にchan
に値を押し込むまで、waitForChan
は別スレッドで待ち続ける。3秒経過すると、waitForChan
のchan.get
がブロック解除され、got value 42 !!
と表示されている。
channelとDeferredとの違い
ここまで見た通り、GolangのchannelとCE3のDeferredとは、「読み取るまでブロックする」という点が同じで、他はいくつか異なる:
- channelはsendするときもブロックするが、Deferredは特にブロックしない
- channelは一度のreceiveにつき1度のデータを受信するが、Deferredは一度入力した値を何度でも出力する
ちなみに、CE3のQueue
がchannel同等の処理(送受信でブロックする、複数回の入力が可能)を実は提供しているのだが、まだ勉強していないのでまた今度。