Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Cats Effectで同時実行数を制御しながらIOを並行実行する

Scalaの軽量スレッドなどを提供するCats Effectで、Seqに詰まったタスクを並行に実行したいが同時実行数は制限したいということがあったので、それに対応する実装をしたメモ。

typelevel.org

IOのsequence

Cats Effectでは、sequenceを使うことでSeq[IO[A]]IO[Seq[A]]の形に変換することができる*1。セマンティック的には、コレクション内の全てのIOを逐次実行し、それをまとめて返すようなIOという感じの振る舞いになる。式を組み立てていくときにはIOでくるまれている必要があるので、頻出のイディオムだ。

val print42: IO[Int] = IO.println("42") >> IO.sleep(1 second) >> IO.pure(42)
val ios: Seq[IO[Int]] = Seq(print42, print42, print42) // このままだと実行できない

val sequenced: IO[Seq[Int]] = ios.sequence // IOにまとめることで逐次実行できるようになる。実行すると3秒かかる

parsequenceによる並行実行

さて、sequenceには1つバリエーションがあり、parSequenceと呼ばれる。これは直列実行ではなく並行実行を行うものだ。

val print42: IO[Int] = IO.println("42") >> IO.sleep(1 second) >> IO.pure(42)
val ios: Seq[IO[Int]] = Seq(print42, print42, print42) // このままだと実行できない

val sequenced: IO[Seq[Int]] = ios.parSequence // IOにまとめることで並行実行できるようになる。実行すると1秒かかる

返り値の型のシグネチャは全く同じだが、セマンティック的には同時実行とか並行実行と呼んでさしつかえない挙動になる(厳密に同時に実行されるかは保証しないので、並列実行とは書かなかった)。

同時実行数の制限が必要なこともある

ところで、Cats EffectではこのようにparSequenceなどを使って並列実行を行うと軽量スレッド数の上限(上限はチューニングによって変動する)まで可能な限り同時実行を行おうとするが、これが望まれないこともある。例えば自分の例だと100個のURIに対してヘッドレスブラウザを立ち上げてスクリーンショットを行なわせようとしたが、何も制約を加えないと100並列でchromeが立ち上がってメモリやCPUリソースを枯渇させてしまう。最大10個までのchromeまで立ち上がってよい、といった制約を設けたい場合、どのように実装すればよいだろうか?

Semaphoreによる実装

これを解決するために、Cats Effectが提供するSemaphoreを利用する。Semaphoreとは並行制御用の基礎的なコンポーネントで、同時に同じリソースにアクセスできるコードを一定数以下に制約するものである。

typelevel.org

ja.wikipedia.org

ちなみに、Semaphoreの許容する同時アクセス数を1にまで落としたものを特にMutexと呼ぶ。Mutexではあるリソースへのアクセスは完全に排他的になる。

ja.wikipedia.org

Semaphore.permitを利用するとIO[Resource[IO, Unit]]が得られ、その中ではロックが取られた状態になる。また、Resourceとは、「使う前後に何らかの処理が必ず必要な資源」を表現するためのCats Effect上の概念だ。この概念によって、ロック・ファイルハンドラ・プログレスバー、バックグラウンド処理などが統一的に操作可能になる。

typelevel.org

これらの性質をうまく利用して、特定のリソース・・・ここではヘッドレスブラウザの同時アクセス数を10に絞ってみよう。「ヘッドレスブラウザはセマフォによる制御が必要なリソースである」ということを表現し、並行制御に組込もう。

まずは、ヘッドレスブラウザを表現するクラスであるChromeScreenShotをCats EffectのResource型に変換しよう。セマフォによる制御を行うには、セマフォとmapで合成してしまうだけでよい。

import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.std.Semaphore
import cats.implicits._

def screenShotResource: IO[Resource[IO, ScreenShot]] =
  for {
    smph <- Semaphore[IO](10) // 上限10のセマフォを生成する
  } yield smph.permit.map { _ => // そのセマフォの中では
    new ChromeScreenShot(...) // ChromeScreenShotが使える ということを表現する
  }

Semaphoreにmapすることで、Semaphoreに合成した新たなSemaphoreっぽいリソースを表現できる。ここではまだResource[IO, ScreenShot]ではなくてIO[Resource[IO, ScreenShot]]であることに注意しよう。リソース確保自体に副作用が発生するおそれがあるから、リソースを用意することもIOに包まなければならないためだ(まぁ、あまり深く考えなくてよい)。

  • 「スクリーンショットのためのResourceを用意する操作」screenShotResourceを定義した。

次に、このscreenShotResourceを実際に利用する。

val shots: IO[Seq[ShotImage]] = for {
  ss <- screenShotResource // ここで実際のChromeScreenShotを向いたResourceが生成される
  imgs <- for {
    sceneImages <- listOfScreenShotTargetURI.map { pair =>
      // ループの中でResource.useを呼び出す
      ss.use { ss => ss.getScreenShotOfSomeURI(...) } // 入るときにセマフォのカウンタが1減り、出ていくときに1戻される。0のときは1になるまでブロックされる
  }.parSequence // これを並行化する
} yield imgs

screenShotResourceからリソースを生成する箇所と、parSequenceする箇所がちゃんと分かれていることに注意してほしい。parSequenceの中でscreenShotResourceからリソースを生成すると、並行処理の中で別々のセマフォが生成されて全く役立たずになってしまう。セマフォやミューテックスのリソースは外側で建て、それを内側の込み入った箇所で参照する、という点にさえ気をつければ、Cats Effectでの並行プログラミングはちゃんとうまくいく。リソースをIOから取出す箇所を間違えたせいで同一のリソースにアクセスできなくなってしまうのはCats Effect初心者あるあるだ(自分も踏み抜いた!)。

これにより、最大10並行でSeqに入ったIOを処理することができるようになった。

  • まずスクリーンショットのためのResourceを確保し、並行処理中にそれを参照させて同時実行数を制御しつつスクリーンショットを撮影する処理を定義した。
  • 注意点: 共有するリソースは並行処理に入る前に用意し、その後で並行処理に入る。

まとめ

この記事では以下のことを行った:

  • parSequenceで複数のIOの並行実行を行った
  • 同時アクセスに制限が必要なリソースをCats EffectのResourceとして、Semaphoreとの合成によって表現した
  • Resourceuseメソッドによって実際にセマフォのロックを取るコードを表現し、for式に統合した

この手法の優れている点は、ロックがResourceという共通の概念に包括され、統合的に扱われている点だ。

*1:技術的には、IO型はCatsのApplicative型クラスのインスタンスであり、Seq型がTraverse型クラスのインスタンスだからである

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?