Scalaの軽量スレッドなどを提供するCats Effectで、Seq
に詰まったタスクを並行に実行したいが同時実行数は制限したいということがあったので、それに対応する実装をしたメモ。
IOのsequence
Cats Effectでは、sequence
を使うことでSeq[IO[A]]
をIO[Seq[A]]
の形に変換することができる*1。セマンティック的には、コレクション内の全てのIO
を逐次実行し、それをまとめて返すようなIO
という感じの振る舞いになる。式を組み立てていくときにはIO
でくるまれている必要があるので、頻出のイディオムだ。
val print42: IO[Int] = IO.println("42") >> IO.sleep(1 second) >> IO.pure(42) val ios: Seq[IO[Int]] = Seq(print42, print42, print42) // このままだと実行できない val sequenced: IO[Seq[Int]] = ios.sequence // IOにまとめることで逐次実行できるようになる。実行すると3秒かかる
parsequenceによる並行実行
さて、sequence
には1つバリエーションがあり、parSequence
と呼ばれる。これは直列実行ではなく並行実行を行うものだ。
val print42: IO[Int] = IO.println("42") >> IO.sleep(1 second) >> IO.pure(42) val ios: Seq[IO[Int]] = Seq(print42, print42, print42) // このままだと実行できない val sequenced: IO[Seq[Int]] = ios.parSequence // IOにまとめることで並行実行できるようになる。実行すると1秒かかる
返り値の型のシグネチャは全く同じだが、セマンティック的には同時実行とか並行実行と呼んでさしつかえない挙動になる(厳密に同時に実行されるかは保証しないので、並列実行とは書かなかった)。
同時実行数の制限が必要なこともある
ところで、Cats EffectではこのようにparSequence
などを使って並列実行を行うと軽量スレッド数の上限(上限はチューニングによって変動する)まで可能な限り同時実行を行おうとするが、これが望まれないこともある。例えば自分の例だと100個のURIに対してヘッドレスブラウザを立ち上げてスクリーンショットを行なわせようとしたが、何も制約を加えないと100並列でchromeが立ち上がってメモリやCPUリソースを枯渇させてしまう。最大10個までのchromeまで立ち上がってよい、といった制約を設けたい場合、どのように実装すればよいだろうか?
Semaphoreによる実装
これを解決するために、Cats Effectが提供するSemaphoreを利用する。Semaphoreとは並行制御用の基礎的なコンポーネントで、同時に同じリソースにアクセスできるコードを一定数以下に制約するものである。
ちなみに、Semaphoreの許容する同時アクセス数を1にまで落としたものを特にMutexと呼ぶ。Mutexではあるリソースへのアクセスは完全に排他的になる。
Semaphore.permit
を利用するとIO[Resource[IO, Unit]]
が得られ、その中ではロックが取られた状態になる。また、Resource
とは、「使う前後に何らかの処理が必ず必要な資源」を表現するためのCats Effect上の概念だ。この概念によって、ロック・ファイルハンドラ・プログレスバー、バックグラウンド処理などが統一的に操作可能になる。
これらの性質をうまく利用して、特定のリソース・・・ここではヘッドレスブラウザの同時アクセス数を10に絞ってみよう。「ヘッドレスブラウザはセマフォによる制御が必要なリソースである」ということを表現し、並行制御に組込もう。
まずは、ヘッドレスブラウザを表現するクラスであるChromeScreenShot
をCats EffectのResource
型に変換しよう。セマフォによる制御を行うには、セマフォとmap
で合成してしまうだけでよい。
import cats.effect.IO import cats.effect.kernel.Resource import cats.effect.std.Semaphore import cats.implicits._ def screenShotResource: IO[Resource[IO, ScreenShot]] = for { smph <- Semaphore[IO](10) // 上限10のセマフォを生成する } yield smph.permit.map { _ => // そのセマフォの中では new ChromeScreenShot(...) // ChromeScreenShotが使える ということを表現する }
Semaphoreにmap
することで、Semaphoreに合成した新たなSemaphoreっぽいリソースを表現できる。ここではまだResource[IO, ScreenShot]
ではなくてIO[Resource[IO, ScreenShot]]
であることに注意しよう。リソース確保自体に副作用が発生するおそれがあるから、リソースを用意することもIO
に包まなければならないためだ(まぁ、あまり深く考えなくてよい)。
- 「スクリーンショットのためのResourceを用意する操作」
screenShotResource
を定義した。
次に、このscreenShotResource
を実際に利用する。
val shots: IO[Seq[ShotImage]] = for { ss <- screenShotResource // ここで実際のChromeScreenShotを向いたResourceが生成される imgs <- for { sceneImages <- listOfScreenShotTargetURI.map { pair => // ループの中でResource.useを呼び出す ss.use { ss => ss.getScreenShotOfSomeURI(...) } // 入るときにセマフォのカウンタが1減り、出ていくときに1戻される。0のときは1になるまでブロックされる }.parSequence // これを並行化する } yield imgs
screenShotResource
からリソースを生成する箇所と、parSequence
する箇所がちゃんと分かれていることに注意してほしい。parSequence
の中でscreenShotResource
からリソースを生成すると、並行処理の中で別々のセマフォが生成されて全く役立たずになってしまう。セマフォやミューテックスのリソースは外側で建て、それを内側の込み入った箇所で参照する、という点にさえ気をつければ、Cats Effectでの並行プログラミングはちゃんとうまくいく。リソースをIO
から取出す箇所を間違えたせいで同一のリソースにアクセスできなくなってしまうのはCats Effect初心者あるあるだ(自分も踏み抜いた!)。
これにより、最大10並行でSeq
に入ったIO
を処理することができるようになった。
- まずスクリーンショットのためのResourceを確保し、並行処理中にそれを参照させて同時実行数を制御しつつスクリーンショットを撮影する処理を定義した。
- 注意点: 共有するリソースは並行処理に入る前に用意し、その後で並行処理に入る。
まとめ
この記事では以下のことを行った:
parSequence
で複数のIO
の並行実行を行った- 同時アクセスに制限が必要なリソースをCats Effectの
Resource
として、Semaphore
との合成によって表現した Resource
のuse
メソッドによって実際にセマフォのロックを取るコードを表現し、for
式に統合した
この手法の優れている点は、ロックがResource
という共通の概念に包括され、統合的に扱われている点だ。
*1:技術的には、IO型はCatsのApplicative型クラスのインスタンスであり、Seq型がTraverse型クラスのインスタンスだからである