Akka 2.8でActorがクラッシュした場合にどのように振る舞うかについて調べる機会があったのでメモ。
ライブラリ依存性は以下の通り:
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.8.3
Actorの監視
通常のプログラムは例外をキャッチしないと最終的にプログラムが終了するが、Akkaはフォールトトレラントなシステムを指向しており、障害からの便利な回復手段を提供している。そのうちの一つが監視(Supervision)である。
監視されたActorが何らかの理由でクラッシュしたとき、そのActorを監視していた別のActorにメッセージが送信される。これを受けて監視Actorは何らかのアクションを取る。
例: UnstableActor
以下のような、たまに壊れるActorを用意した:
import akka.actor.typed.Behavior import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors import scala.util.Random object UnstableActor { def apply(): Behavior[String] = Behaviors.receiveMessage { case "hello" => println("world!") Behaviors.same case _ => if(Random.nextDouble() < 0.1) { throw new Exception("CRASH!!") } Behaviors.same } }
context.watch
: 最も原始的な監視
最も原始的な監視はcontext.watch
だ。あるActorがcontext.watch(someActorRef)
した上でsomeActorRef
が終了(正常終了であれクラッシュであれ)した場合、監視していたActorも道連れに終了するという挙動になる。子Actorのうちどれかがクラッシュしたら作業が続行できないような場合に使うことができる。
import akka.actor.typed.scaladsl.Behaviors val rootBehavior = Behaviors.setup[Nothing] { context => // Actorをスポーンする val actor = context.spawn(UnstableActor(), "UnstableActor") // Actorを監視する context.watch(actor) ... }
厳密にはwatch
はAkkaでいうところのSuperviseではないようだが、便宜上これも監視だと思っている。
しかし、単にwatch
するだけだと結局アプリケーション全体に障害が波及して停止してしまう。
Behaviors.supervise()
より高度な監視戦略として、akka.actor.typed.SupervisorStrategy
が用意されている。これを利用するにはBehaviors.supervise()
を使う:
import akka.actor.typed.Behavior import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors object UnstableActor { def supervisedBehavior: Behavior[String] = Behaviors .supervise(apply()) .onFailure(SupervisorStrategy.restart) ... }
UnstableActor()
はそれ単体だと単なる不安定なActorだが、supervise
でBehavior
を包み、onFailure
でクラッシュ時の戦略を設定することで勝手に再起動するようになる。
SupervisorStrategy
では、restart
以外にもいくつかの戦略を提供する:
- (
restart
: クラッシュした場合再びActorを作り直す) stop
: クラッシュした場合そのままActorを停止させる- ->
watch
したアクターに通知が行く
- ->
resume
: 内部状態を保ったままActorを再利用する- エラーを無視するが、内部状態が壊れている場合は無限ループに入る危険もともなう
restartWithBackoff
:restart
するが、exponential backoffによって再起動時間を延ばす- Jitter要素があるのでリトライタイミングの負荷分散が行なわれる
- 詳細はscaladocを参照のこと
また、restart
やresume
には時間あたりの回数制限を設定できる:
import akka.actor.typed.Behavior import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors import scala.concurrent.duration.Duration object UnstableActor { def supervisedBehavior: Behavior[String] = Behaviors .supervise(registry(Map.empty)) .onFailure( SupervisorStrategy.restart.withLimit(5, Duration(1, "minute")) ) ... }
この例では1分以内に5回を超えてクラッシュするとActorは再起動を中止して停止する。
ちなみにActorに送信したメッセージはどのSupervise strategyをとっても喪失するので、送信側がリトライするなどの管理を行う必要がある(Actor側は関知しない)。未処理のメッセ=ジはActorに渡されていないので、Actorはクラッシュした次のメッセージから処理し始めることになる。詳しくは以下の文献を参考。
またAkkaはリトライ用のユーティリティを用意しているので、これをask
と組み合わせて使うと良いだろう: