Redditを見ていると便利そうな記事を発見した。
この記事では、MutexやSemaphore、そしてCyclic Barrierの使い方について解説している。
セマフォとは同時に一定数のみがリソースにアクセスできるようにする、つまり「いくつまでなら同時に使えるか」を安全に管理するための、並行性制御のプリミティブである。駐車場の空き表示もセマフォだと考えることができるし、トイレの個室もセマフォだと考えられる。同時にアクセスしたら大変だからね。
https://typelevel.org/cats-effect/api/3.x/cats/effect/std/Semaphore.html
そして同時に許可できる数が1に固定されているものを特にミューテックスと呼ぶ。対面交互通行になっている工事現場の道路とかはミューテックスだ。
以前自分もセマフォについていくつか記事を書いていた:
しかし、最初に挙げた記事での書き方がいくぶんかスマートだと思ったので紹介する。
permit
メソッドを使う
Semaphore
にはpermit
メソッドがある:
シグネチャはdef permit: kernel.Resource[F, Unit]
になっていて、呼ぶとリソースがもらえる。Cats Effectにおける「リソース」とは、「なにかを取得する」ための操作と「それを解放する」ための操作とを対にしたもので、必ず安全に操作できることが保証されている制御用のプリミティブだ。
例外が発生するなどしても必ずリソースの解放処理が呼ばれるため、「後でちゃんと解放しなければならない」ような概念の記述にうってつけだ。
さて、permit
はセマフォをリソース化してくれるので、以下のように書くだけで「セマフォが空くまで待機し、セマフォが空いたら処理し、終了したらセマフォを返す」操作を記述できる:
val smph = ??? val showMessage: IO[Unit] = smph.permit.use(_ => IO.println("yo"))
実際のセマフォの用意と組み合わせると、以下のように書くことになる:
import scala.concurrent.duration.* import cats.effect.std.Semaphore def heavyTask(): IO[Unit] = for { _ <- IO.sleep(1.seconds) _ <- IO.println("done!") } yield () for { // セマフォを作る(外部からもらってきてもいい) smph <- Semaphore[IO](10) // セマフォを使いつつ重いタスクをするようなタスクを作成する doHeavyTask = () => smph.permit.use(_ => heavyTask()) // 100個同時実行する _ <- (1 to 100).toList.parTraverse(_ => doHeavyTask()) } yield ()
このように書くことで、やりたいことの関心が以下のように分離されて綺麗に書ける:
- セマフォの仕様。いくつまで同時にアクセスできるのか?
- セマフォを利用する処理。どのタスクがセマフォの介入を必要とするのか?
- 処理の並行(並列)実行。セマフォのことを気にせずに、ただ沢山呼べばよいし、もちろん直列に呼ぶこともできる。