GolangにはGoroutineという良いやつがある。非同期処理をいい感じにやってくれる賢い軽量スレッドだよ。
そして、Goroutine同士はchannelという概念を通じて会話することができる。
他方、Scalaの非同期処理をいい感じにやるライブラリであるCats Effectにも似たことをする機能がある。
Deferred[A]は、一回だけ入力することができるchannelのようなもの。
IO.defered[A]-- deferredを生成する。deferred.complete(a: A)-- deferredに値を渡して完了させる。deferred.get-- deferredから値を取得しようと試みる。その間論理的にブロックする。- 論理的にブロック: 実際にスレッドがブロックするのではなく、Cats Effectランタイムはスレッドを他の処理に使い回す。
- 一度completeしたDeferredからは何度でもgetできる
例えば、このように使う:
trait LikeGoroutine { import scala.concurrent.duration._ import scala.language.postfixOps import cats.effect.Deferred def runGoroutine = for { _ <- showThreadInfo *> IO.println("Running three Fibers") chan <- IO.deferred[Int] _ <- (triggerAfterThreeSeconds(chan), waitForChan(chan), waitForChan(chan)) parTupled } yield () def triggerAfterThreeSeconds(chan: Deferred[IO, Int]): IO[Unit] = { for { _ <- showThreadInfo *> IO.println("trigger Deferred after 3 seconds...") _ <- IO.sleep(3 seconds) _ <- showThreadInfo *> IO.println("triggering") _ <- chan.complete(42) } yield () } def waitForChan(chan: Deferred[IO, Int]): IO[Unit] = { for { _ <- showThreadInfo *> IO.println("waiting for Deferred...") n <- chan.get _ <- showThreadInfo *> IO.println(s"got value $n !!") } yield () } def showThreadInfo: IO[Unit] = IO.print(s"[${Thread.currentThread().getName()}] ") }
runGoroutineを実行すると、出力は以下のようになる:
Available #CPU: 12 [io-compute-5] Running three Fibers [io-compute-5] trigger Deferred after 3 seconds... [io-compute-5] waiting for Deferred... [io-compute-5] waiting for Deferred... [io-compute-12] triggering [io-compute-7] got value 42 !! [io-compute-15] got value 42 !!
runGoroutineの中では、chanという名前でDeferred[Int]を作成している。その後、triggerAfterThreeSecondsと、2つのwaitForChanとをparTupledによって並行実行している。
triggerAfterThreeSecondsがその名の通り3秒後にchanに値を押し込むまで、waitForChanは別スレッドで待ち続ける。3秒経過すると、waitForChanのchan.getがブロック解除され、got value 42 !!と表示されている。
channelとDeferredとの違い
ここまで見た通り、GolangのchannelとCE3のDeferredとは、「読み取るまでブロックする」という点が同じで、他はいくつか異なる:
- channelはsendするときもブロックするが、Deferredは特にブロックしない
- channelは一度のreceiveにつき1度のデータを受信するが、Deferredは一度入力した値を何度でも出力する
ちなみに、CE3のQueueがchannel同等の処理(送受信でブロックする、複数回の入力が可能)を実は提供しているのだが、まだ勉強していないのでまた今度。