Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Cats Effect 3のDeferredでGoroutine / channelっぽいことをする

GolangにはGoroutineという良いやつがある。非同期処理をいい感じにやってくれる賢い軽量スレッドだよ。

go-tour-jp.appspot.com

そして、Goroutine同士はchannelという概念を通じて会話することができる。

go-tour-jp.appspot.com

他方、Scalaの非同期処理をいい感じにやるライブラリであるCats Effectにも似たことをする機能がある。

Deferred[A]は、一回だけ入力することができるchannelのようなもの。

  • IO.defered[A] -- deferredを生成する。
  • deferred.complete(a: A) -- deferredに値を渡して完了させる。
  • deferred.get -- deferredから値を取得しようと試みる。その間論理的にブロックする。
    • 論理的にブロック: 実際にスレッドがブロックするのではなく、Cats Effectランタイムはスレッドを他の処理に使い回す。
  • 一度completeしたDeferredからは何度でもgetできる

例えば、このように使う:

trait LikeGoroutine {
  import scala.concurrent.duration._
  import scala.language.postfixOps
  import cats.effect.Deferred

  def runGoroutine = for {
    _ <- showThreadInfo *> IO.println("Running three Fibers")
    chan <- IO.deferred[Int]
    _ <- (triggerAfterThreeSeconds(chan), waitForChan(chan), waitForChan(chan)) parTupled
  } yield ()

  def triggerAfterThreeSeconds(chan: Deferred[IO, Int]): IO[Unit] = {
    for {
      _ <- showThreadInfo *> IO.println("trigger Deferred after 3 seconds...")
      _ <- IO.sleep(3 seconds)
      _ <- showThreadInfo *> IO.println("triggering")
      _ <- chan.complete(42)
    } yield ()
  }

  def waitForChan(chan: Deferred[IO, Int]): IO[Unit] = {
    for {
      _ <- showThreadInfo *> IO.println("waiting for Deferred...")
      n <- chan.get
      _ <- showThreadInfo *> IO.println(s"got value $n !!")
    } yield ()
  }

  def showThreadInfo: IO[Unit] = IO.print(s"[${Thread.currentThread().getName()}] ")
}

runGoroutineを実行すると、出力は以下のようになる:

Available #CPU: 12
[io-compute-5] Running three Fibers
[io-compute-5] trigger Deferred after 3 seconds...
[io-compute-5] waiting for Deferred...
[io-compute-5] waiting for Deferred...
[io-compute-12] triggering
[io-compute-7] got value 42 !!
[io-compute-15] got value 42 !!

runGoroutineの中では、chanという名前でDeferred[Int]を作成している。その後、triggerAfterThreeSecondsと、2つのwaitForChanとをparTupledによって並行実行している。

triggerAfterThreeSecondsがその名の通り3秒後にchanに値を押し込むまで、waitForChanは別スレッドで待ち続ける。3秒経過すると、waitForChanchan.getがブロック解除され、got value 42 !!と表示されている。

channelとDeferredとの違い

ここまで見た通り、GolangのchannelとCE3のDeferredとは、「読み取るまでブロックする」という点が同じで、他はいくつか異なる:

  • channelはsendするときもブロックするが、Deferredは特にブロックしない
  • channelは一度のreceiveにつき1度のデータを受信するが、Deferredは一度入力した値を何度でも出力する

ちなみに、CE3のQueueがchannel同等の処理(送受信でブロックする、複数回の入力が可能)を実は提供しているのだが、まだ勉強していないのでまた今度。

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?