この記事では、プログラミング言語Scalaにおいて関数型ライブラリCatsをベースとした非同期処理(グリーンスレッド)ライブラリである Cats Effect(CE) において、プロセスがSIGINTを受信した場合にどのようにCEが振る舞うかを解説する。
またこの記事は執筆時点で最新であるCE 3を対象とする。
IOApp
もしあなたがCEユーザであるなら、おそらくIOApp
トレイトを使ったMain
オブジェクトをScalaプログラムのエントリポイントに設定している場合がほとんどだろう。
//> using scala 3.2 //> using dep "org.typelevel::cats-effect:3.5.0" import cats.effect.{IO, IOApp, ExitCode} import cats.effect.std.Console object Main extends IOApp: def run(args: List[String]) = for { _ <- IO.print("Enter your name: ") name <- Console[IO].readLine _ <- IO.println(s"Hello, $name") } yield ExitCode.Success
このプログラムを実行すると、Scalaはプロンプトとともにあなたの名前を尋ね、そして入力を待ち受ける。名前が入力されると、それを表示して終了する。
もしプログラムが入力を待ち受けている間にあなたがCtrl+C
を押下した場合、プログラムはそのまま終了する。当たり前に見えるかもしれないが、複雑な処理が行なわれていて、それはIOApp
のベールの下に隠蔽されている。
CEはIOApp
にシグナルハンドリングの責務を負わせている。つまり、SIGINT
などのシグナルを受け取ったとき、IOApp
がこれをハンドルし、実行中の同期/非同期処理を 適切に終了 させるのだ。
例えば、run
が何らかのリソースを確保している場合にSIGINT
を受信した場合でも、IOApp
はリソースを解放してからプログラムを終了させようとする:
//> using scala 3.2 //> using dep "org.typelevel::cats-effect:3.5.0" import cats.effect.{IO, IOApp, ExitCode, Resource} import cats.effect.std.Console object Main extends IOApp: val importantResource: Resource[IO, Int] = Resource.make(IO.println("Acquiring resource...") >> IO.pure(42))(_ => IO.println("Releasing resource...") ) def run(args: List[String]) = for { re <- importantResource.use { r => for { _ <- IO.print("Enter your name: ") name <- Console[IO].readLine _ <- IO.println(s"Hello, $name") } yield () } } yield ExitCode.Success
Acquiring resource... Enter your name: ^CReleasing resource...
上に示したように、入力待ち状態でSIGINT
を送ってみるときちんとimportantResource
の解放処理が入っていることがわかる。これは、IOApp
がきちんとSIGINT
をハンドリングしている証左だ。
具体的にどこでシグナルがハンドリングされているか見ていこう。
IOApp
はJavaランタイムのシャットダウンフックにhandleShutDown
メソッドを登録する。- SIGINTがプロセスに届くと、Javaランタイムはあらかじめ登録されたフックである
handleShutDown
を呼び出す。 handleShutDown
は、メインFiberをキャンセルし、事前に設定されたタイムアウトまで猶予を与える。- Fiberがキャンセル完了するかタイムアウトが到来したとき、軽量スレッドの管理用ランタイムである
runtime.shutdown()
が呼び出され、ネイティブスレッドなどが引き上げていき終了する。
このように、SIGINTを受信した場合はメインFiberに対してcancel
メソッドが呼び出される。Fiberがキャンセルされるとき、それが呼び出しているFiberを連鎖的にキャンセルしていくので、最終的に全てのFiberが停止されるようになっている。ちなみにFiberとはCEにおける軽量スレッドで、GoのGoroutineのようなものであり、ランタイムによって制御された実行単位である。
uncancelable
シグナルによってFiberが停止されることはわかったが、中途半端な状態で停止することが許されないFiberも存在する。例えばファイルをオープンしている場合は閉じなければならないし、外部プロセスを呼び出している場合は終了まで待たなければならない。クリティカルなセクションでは停止できない。
このような目的のために、CEはuncancelable
というメソッドを用意している。これが呼び出されたメソッドは、それが自発的に終了するまでキャンセル不可能になる。例えば、以下のコードは10秒間待つ間はキャンセルできない:
//> using scala 3.2 //> using dep "org.typelevel::cats-effect:3.5.0" import cats.effect.{IO, IOApp, ExitCode, Resource} import cats.effect.std.Console object Main extends IOApp: def run(args: List[String]) = for { _ <- IO.print("Waiting 10 sec") _ <- IO .sleep(scala.concurrent.duration.FiniteDuration(10, "seconds")) .uncancelable _ <- IO.println("Done!") } yield ExitCode.Success
uncancelableはいたるところで使われ、不整合な状態でプログラムが停止することを防いでいる。ちなみに、キャンセル不可能にはせずにキャンセル時に特別な処理を行うonCancel
メソッドも用意されているので、うまく活用したい。
あわせて読みたい
https://typelevel.org/cats-effect/api/3.x/cats/effect/IOApp.html