Cats Effect 3のセマフォの使い方の例があまりないなと思ったので自分で書いておいた。標語もあるよ。
Cats Effectという、Scalaで非同期処理のハンドリングをやりやすくするためのライブラリがある。
非同期処理のためのライブラリ(より詳細には、副作用をともなう操作をカプセル化するためのライブラリ)なので、当然セマフォが用意されている。このセマフォの使い方について解説していくぜ(ゆっくり解説)
そもそもセマフォって何よ
まずセマフォについて説明する。セマフォというのは、あるコード上の区間やリソースに同時にアクセスできる数を制約するための仕組みのことだ。
セマフォという言葉は元々は腕木信号機のことを示していたらしい。鉄道の信号機も、レールというクリティカルなリソースに同時にアクセスできる列車を1つに制約するための仕組みだと考えることができる。同時に2つの列車が1つのレールにアクセスすると大惨事になってしまうので、列車がレールを占有している間、他の列車は手前で待たなければならない。これと同じことをコードの区間で行うのがプログラミングにおけるセマフォだ。
クリティカルなリソースというのは、例えば以下のようなものだ:
- プログラム内で共有されている読み書き自由な変数
- マルチスレッドで同時に読み書きするとぶっ壊れるおそれがある
- ファイル
- 同時に書き込むとぶっ壊れるおそれがある
- データの一貫性が重要な場合、同時に読むことも危険な場合がある
- n個併設されたシャワールーム
- 同時にn人までしか占有できない。同時に使おうとすると逮捕されるおそれがある
java.security.MessageDigest
のインスタンス- スレッドセーフではないことが知られており、同時にダイジェストを生成しようとすると壊れる
クリティカルなリソースは、シングルスレッドなプログラミングを行っている場合にはあまり気にならないことが多い。というのも、シングルスレッドで動作しているプログラムは同時に2つ以上のコードを実行することは原理的にありえないからである。
前提知識
.sequence
List[IO[A]]
をIO[List[A]]
に変換する。- 具体的には複数処理の待ち合わせに使う。
.parSequence
.sequence
と同じく、List[IO[A]]
をIO[List[A]]
に変換する。- ただし、勝手にスレッドが生えて処理が並列に行われる。並列数はCats Effectが自動制御する。全ての処理が終わったときに次の処理へ進む。
List
の順序は保存される、全部終わるまで待つという特性があるので気楽に使える。- 厳密にはOSネイティブのスレッドではなく、Cats Effectが管理する軽量スレッドのFiberが生成されるのでコストが小さい。
Cats Effect 3によるセマフォ
Cats Effect 3では、セマフォは以下のようにしてIO[Semaphore[IO]]
を取得する。
import cats.effect.IO import cats.effect.std.Semaphore val semaphoreIO: IO[Semaphore[IO]] = Semaphore[IO](セマフォが許容する数)
- いわゆるバイナリセマフォ、ミューテックスの場合は
1
を渡すことになる - 同時にn個占有可能な場合はその数を指定する
注意すべき点として、Semaphore
で得られるのはIO[Semaphore[IO]]
型であり、直接セマフォを得られるわけではないという点がある。これはなぜかというと、まずCats Effectにおいては副作用を伴う全ての操作はIO
で表現する方針が貫かれており、最後に組み立てられたIO
が一気に実行される、という実行モデルになっていて、セマフォ生成も副作用を伴う操作だとみなされているためだと思われる。
実際にセマフォを使った操作の例を示す。
import cats.effect.IO import cats.effect.implicits._ import cats.effect.std.Semaphore import cats.implicits._ // なんらかのアトミックに使ってほしいリソースという想定 // 同時に読んでも書いてもならない var atomicNumber = 0 // 良いセマフォ操作の例 val goodSemaphoreOperation = for { lis <- IO.pure((1 to 50).toList) smph <- Semaphore[IO](1) // ここでsmphという名前のセマフォが作成される // 同一のセマフォをgoodAtomicOperationに渡す atomicResult <- lis.map(n => goodAtomicOperation(n, smph)).parSequence // parSequenceを使っているので、同じセマフォが渡された状態で並列実行される _ <- atomicResult.map(res => IO.println(s"Result: $res")).sequence } yield () def goodAtomicOperation(n: Int, smph: Semaphore[IO]): IO[Int] = for { // 同時に1つの処理しかここを通らないようにしたい // セマフォが使用中の場合はここでブロックする _ <- smph.acquire _ <- IO { atomicNumber += 1 } result <- IO.pure(atomicNumber) // 正しくアトミックに操作できていれば、常に1となる _ <- IO { atomicNumber -= 1 } _ <- smph.release } yield result
セマフォはfor
式の中でIO
が外れて実際のSemaphore[IO]
になる。これがアトミックな操作の中に渡され、アトミックな操作の中ではセマフォの取得と開放が行われている。parSequenceによってファイバーがスポーンする前にSemaphoreを生成しているのが重要である。
このgoodSemaphoreOperation
の実行結果は以下の通りだ:
Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1 Result: 1
全ての結果が1になった。これは、セマフォが適切に共有リソースatomicNumber
のアクセスを管理でき、同時に変数にアクセスしなかったことを意味している。
落とし穴
よくある(というより自分がよくハマる)ミスも示しておき、他山の石としてほしい。先程の例ではスレッドが(厳密にはファイバーが)分岐する前にセマフォを作成していたが、これを分岐した後に行うとどうなるだろうか?
// 悪いセマフォ操作の例 // ここでSemaphoreを定義しても、実際の型はIO[Semaphore[IO]]なので「セマフォを生成」したのではなく「セマフォを生成する操作」を定義したことにしかならない val badSemaphore = Semaphore[IO](1) val badSemaphoreOperation = for { lis <- IO.pure((1 to 50).toList) atomicResult <- lis.map(n => badAtomicOperation(n)).parSequence // ここでスレッドは分岐する _ <- atomicResult.map(res => IO.println(s"Result: $res")).sequence } yield () def badAtomicOperation(n: Int): IO[Int] = for { // ここでbadSemaphoreを使っているが、badSemaphoreはセマフォではなく「セマフォを生成する操作」なので、 // 呼び出す度に別々のセマフォが生成されてしまう。 // 並行処理させてからセマフォを作っても無意味!!! smph <- badSemaphore _ <- smph.acquire // 常に成功してしまう _ <- IO { atomicNumber += 1 } result <- IO.pure(atomicNumber) // boom... _ <- IO { atomicNumber -= 1 } _ <- smph.release } yield result
実行するとどうなるか。
Result: 3 Result: 1 Result: 10 Result: 2 Result: 9 Result: 5 Result: 6 Result: 7 Result: 4 Result: 2 Result: 2 Result: 13 Result: 2 Result: 11 Result: 8 Result: 12 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2 Result: -2
セマフォが共有リソースatomicNumber
を守りきれず、値がめちゃくちゃになってしまった。これがもし原子炉の操作だったら訴追ものだ。
この例では、あらかじめSemaphore[IO](1)
を処理の外側で変数定義しておき、アトミック操作に入ってからこれを使おうとした。しかしこれは間違いである。
実際にセマフォが生成されるのはfor
式で中身を取り出すタイミングなので、badAtomicOperation
の中でbadSemaphore
を使おうとしても、その都度スレッド毎に別々のセマフォが生成されてしまうので、実際のところは何も制約したことにはならないのだ。
Cats Effectでは副作用と純粋な操作とは厳密に峻別されている。型がIO
のときは、取り出すまではその効果は得られないと心得ておく必要がある。
標語 (大きな声で読み上げよう)
作ろうよ スポーン前に セマフォをさ
解説
- スレッド(ファイバー)が分岐する前にセマフォを作らなければならない。
- そのセマフォを分岐して実行される処理に渡して使ってもらわなければならない。
- 分岐してから作ったセマフォは常に成功するダメセマフォである。
Semaphore
コンストラクタで作られるのはIO[Semaphore[IO]]
であってSemaphore
そのものではない。for
式の中で分岐直前に直接Semaphore[IO](1)
と書くのが一番間違いが少ないように思われる。
オッス!!