Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Akka(2.8)でアクターの監視(Supervise)を行う

Akka 2.8でActorがクラッシュした場合にどのように振る舞うかについて調べる機会があったのでメモ。

ライブラリ依存性は以下の通り:

libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.8.3

Actorの監視

通常のプログラムは例外をキャッチしないと最終的にプログラムが終了するが、Akkaはフォールトトレラントなシステムを指向しており、障害からの便利な回復手段を提供している。そのうちの一つが監視(Supervision)である。

監視されたActorが何らかの理由でクラッシュしたとき、そのActorを監視していた別のActorにメッセージが送信される。これを受けて監視Actorは何らかのアクションを取る。

例: UnstableActor

以下のような、たまに壊れるActorを用意した:

import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import scala.util.Random

object UnstableActor {
  def apply(): Behavior[String] = Behaviors.receiveMessage {
    case "hello" =>
      println("world!")
      Behaviors.same
    case _ =>
      if(Random.nextDouble() < 0.1) {
        throw new Exception("CRASH!!")
      }
      Behaviors.same
  }
}

context.watch: 最も原始的な監視

最も原始的な監視はcontext.watchだ。あるActorがcontext.watch(someActorRef)した上でsomeActorRefが終了(正常終了であれクラッシュであれ)した場合、監視していたActorも道連れに終了するという挙動になる。子Actorのうちどれかがクラッシュしたら作業が続行できないような場合に使うことができる。

import akka.actor.typed.scaladsl.Behaviors

val rootBehavior = Behaviors.setup[Nothing] { context =>

  // Actorをスポーンする
  val actor = context.spawn(UnstableActor(), "UnstableActor")

  // Actorを監視する
  context.watch(actor)

  ...
}

厳密にはwatchはAkkaでいうところのSuperviseではないようだが、便宜上これも監視だと思っている。

しかし、単にwatchするだけだと結局アプリケーション全体に障害が波及して停止してしまう。

Behaviors.supervise()

より高度な監視戦略として、akka.actor.typed.SupervisorStrategyが用意されている。これを利用するにはBehaviors.supervise()を使う:

import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors

object UnstableActor {
  def supervisedBehavior: Behavior[String] = Behaviors
    .supervise(apply())
    .onFailure(SupervisorStrategy.restart)

  ...
}

UnstableActor()はそれ単体だと単なる不安定なActorだが、superviseBehaviorを包み、onFailureでクラッシュ時の戦略を設定することで勝手に再起動するようになる。

SupervisorStrategyでは、restart以外にもいくつかの戦略を提供する:

  • (restart: クラッシュした場合再びActorを作り直す)
  • stop: クラッシュした場合そのままActorを停止させる
    • -> watchしたアクターに通知が行く
  • resume: 内部状態を保ったままActorを再利用する
    • エラーを無視するが、内部状態が壊れている場合は無限ループに入る危険もともなう
  • restartWithBackoff: restartするが、exponential backoffによって再起動時間を延ばす
    • Jitter要素があるのでリトライタイミングの負荷分散が行なわれる
    • 詳細はscaladocを参照のこと

また、restartresumeには時間あたりの回数制限を設定できる:

import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration.Duration

object UnstableActor {
  def supervisedBehavior: Behavior[String] = Behaviors
    .supervise(registry(Map.empty))
    .onFailure(
      SupervisorStrategy.restart.withLimit(5, Duration(1, "minute"))
    )


  ...
}

この例では1分以内に5回を超えてクラッシュするとActorは再起動を中止して停止する。

ちなみにActorに送信したメッセージはどのSupervise strategyをとっても喪失するので、送信側がリトライするなどの管理を行う必要がある(Actor側は関知しない)。未処理のメッセ=ジはActorに渡されていないので、Actorはクラッシュした次のメッセージから処理し始めることになる。詳しくは以下の文献を参考。

doc.akka.io

またAkkaはリトライ用のユーティリティを用意しているので、これをaskと組み合わせて使うと良いだろう:

doc.akka.io

参考文献

doc.akka.io

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?