Scalaの非同期処理まわりのハンドリングをやりやすくするライブラリ、Cats Effectの教科書を読んでいる。
英語は平易で分かりやすいので、諸君もぜひ読んでほしい。
そんな中、Cats Effectの機能の一つであるResourceの面白い応用を思い付いたので紹介する。
Resource
Resource
とは、Cats Effect (以下、CE)に用意されている標準ライブラリの一つで、取得と解放が必要なリソースを表現したもの。
取得と解放が必要なリソースというのはプログラマにはおなじみで、例えばファイルハンドルとか、ソケットとか、要するにプログラマが使ったあとでOS様に返さなければならないリソース一般のことを想定すればよい。こういったリソースはとかく解放忘れを引き起こしがちで、特にマルチスレッド環境下においては解放まわりの実装が困難を極めることで知られている。CEは非同期処理を主眼に置いたライブラリであり、こうしたリソースの扱いが標準ライブラリとして提供されているというわけだ。
イメージとしては、こういう感じ。
きわめて教科書的なResource
の使い方(というか、ほぼ教科書からの引用)を以下に示す。
Resource
とは取得する操作と解放する操作との組であると考えることができる。このため、Resource
はmake
メソッドで得ることができる:
def make[A](acquire: IO[A])(release: A => IO[Unit]): Resource[IO, A]
実際に例えばなんらかの文字列をResource
に見立てると、次のように生成できる:
import cats.effect._ val acquireOps = IO.println("[acq]") *> IO("String") val releaseOps = IO.println("[rel]").void val stringResource: Resource[IO, String] = Resource.make(acquireOps)(_ => releaseOps)
実際にそのリソースが欲しいときは、.use{ resource => ... }
を呼び出すことで使うことができる:
val run = for { _ <- stringResource.use { s => IO.println(s"Using $s") } } yield {} import cats.effect.unsafe.implicits.global run.unsafeRunSync()
すると実行結果は以下の通りになる:
[acq] Using String [rel]
自動的にacquireする処理が走って、releaseする処理が走った。しかも、CEが非同期まわりのモロモロをうまくハンドリングして隠蔽しているので、マルチスレッドでガンガン使っても壊れない。
Fiber as Resource
さて、このResource概念をスレッドに適用することもできる。つまり、スコープに入ってacquireするときになんらかの非同期処理が開始し、スコープを出てreleaseするときにその非同期処理を中止させる、という性質の処理が表現できるようになる。
- Handler as Resource
- Acquire: ハンドラをOSから確保
- Release: ハンドラをOSに返す
- Thread as Resource
- Acquire: バックグラウンドで動作する非同期処理の開始
- Release: 非同期処理の中止
ちょっと脇道に逸れるが、CEではJVMが提供するネイティブスレッドを直接使用せず、軽量スレッドであるFiberを活用する(Fiberの下敷きとなっている仕組みとしてスレッドが存在する)。雑な例えをするならば、GoにおけるGoroutineみたいなやつだ。CEで非同期処理を行うときは、Fiberで平行世界に処理を放り投げるという感じでやっていくことになる。ちなみにFiberの名前の由来はThreadにかけたダジャレらしい(Threadは撚り糸くらいの意味で、Fiberは言わずもがな「繊維」である)。
そういうわけなので、実際はスコープに入って出るときに、Fiberを開始して終了するという処理ができるようになる。つまり、一定区間にわたって軽量バックグラウンド処理を走らせられるのだ。これの何がすごいのかというと…………
ダウンロードインジケータ
ちょっとしたバックグラウンド処理の代表格は、プログレスインジケータだ。重たい処理がメインで走っている間、ユーザが癇癪を起こさないようにクルクルと動き、なんとなく処理が進んでいることを表現する。この処理をうまくCEで実装できることに気付いた。
実装は以下のような感じ。
import scala.concurrent.duration._ import scala.language.postfixOps def verySlowDownload: IO[String] = IO.println("Starting Download") >> IO.sleep(3 second) >> IO { // Finally we got result "/tmp/foobar.txt" } def backgroundDownloadingIndicator: ResourceIO[IO[OutcomeIO[Unit]]] = downloadingIndicator.background def downloadingIndicator: IO[Unit] = IO.sleep(100 milliseconds) *> (IO.print("\r|") *> IO.sleep(100 milliseconds) *> IO.print("\r/") *> IO.sleep(100 milliseconds) *> IO.print("\r-") *> IO.sleep(100 milliseconds) *> IO.print("\r\\") *> IO.sleep(100 milliseconds)).foreverM
どう使うかというと、backgroundDownloadingIndicator.use { ... }
して、その中でverySlowDownload
すればよい:
for { downloadedFile <- backgroundDownloadingIndicator.use { _ => verySlowDownload } _ <- IO.println(s"we have downloaded $downloadedFile") } yield IO.unit
すると、verySlowDownload
が始まったタイミングでインジケータをクルクルするためのFiberが起動して、しばらくしてダウンロードが終わるとサッと終了する。
こういった表示を素朴に1から実装しようとするとかなり面倒になるか、ダウンロード処理とインジケータを表示する処理とをごった煮で書くことになってモジュラーじゃない状態になるのではないかと思う。
感想
教科書を読みながら面白い応用を思い付いて、実際にそれがすぐ実装できたのも面白かった。IO
と書くとどうしても重厚長大なIOモナドのことを想像してしまって面倒そうという印象だったけれど、Cats Effectならかなり手軽に運用できそうな雰囲気がある。これは、Scalaが純粋関数型ではないことにより、必要なタイミングで純粋関数的エッセンスを使えるのが大きいと思う。
dependency
libraryDependencies ++= Seq( "org.typelevel" %% "cats-effect" % "3.3.12", )