Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Cats Effect 3でセマフォを使うときの便利なイディオム

Redditを見ていると便利そうな記事を発見した。

lukastymo.com

この記事では、MutexやSemaphore、そしてCyclic Barrierの使い方について解説している。

セマフォとは同時に一定数のみがリソースにアクセスできるようにする、つまり「いくつまでなら同時に使えるか」を安全に管理するための、並行性制御のプリミティブである。駐車場の空き表示もセマフォだと考えることができるし、トイレの個室もセマフォだと考えられる。同時にアクセスしたら大変だからね。

typelevel.org

https://typelevel.org/cats-effect/api/3.x/cats/effect/std/Semaphore.html

そして同時に許可できる数が1に固定されているものを特にミューテックスと呼ぶ。対面交互通行になっている工事現場の道路とかはミューテックスだ。

以前自分もセマフォについていくつか記事を書いていた:

blog.3qe.us

blog.3qe.us

しかし、最初に挙げた記事での書き方がいくぶんかスマートだと思ったので紹介する。

permitメソッドを使う

Semaphoreにはpermitメソッドがある:

https://typelevel.org/cats-effect/api/3.x/cats/effect/std/Semaphore.html#permit:cats.effect.kernel.Resource%5BF,Unit%5D

シグネチャはdef permit: kernel.Resource[F, Unit]になっていて、呼ぶとリソースがもらえる。Cats Effectにおける「リソース」とは、「なにかを取得する」ための操作と「それを解放する」ための操作とを対にしたもので、必ず安全に操作できることが保証されている制御用のプリミティブだ。 例外が発生するなどしても必ずリソースの解放処理が呼ばれるため、「後でちゃんと解放しなければならない」ような概念の記述にうってつけだ。

さて、permitはセマフォをリソース化してくれるので、以下のように書くだけで「セマフォが空くまで待機し、セマフォが空いたら処理し、終了したらセマフォを返す」操作を記述できる:

val smph = ???
val showMessage: IO[Unit] = smph.permit.use(_ => IO.println("yo"))

実際のセマフォの用意と組み合わせると、以下のように書くことになる:

import scala.concurrent.duration.*
import cats.effect.std.Semaphore

def heavyTask(): IO[Unit] = for {
  _ <- IO.sleep(1.seconds)
  _ <- IO.println("done!")
} yield ()

for {
  // セマフォを作る(外部からもらってきてもいい)
  smph <- Semaphore[IO](10)
  // セマフォを使いつつ重いタスクをするようなタスクを作成する
  doHeavyTask = () => smph.permit.use(_ => heavyTask())
  // 100個同時実行する
  _ <- (1 to 100).toList.parTraverse(_ => doHeavyTask())
} yield ()

このように書くことで、やりたいことの関心が以下のように分離されて綺麗に書ける:

  • セマフォの仕様。いくつまで同時にアクセスできるのか?
  • セマフォを利用する処理。どのタスクがセマフォの介入を必要とするのか?
  • 処理の並行(並列)実行。セマフォのことを気にせずに、ただ沢山呼べばよいし、もちろん直列に呼ぶこともできる。
★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?