Akka PersistenceはAkkaでイベントソーシングを行い、アクターの状態を永続化するためのライブラリである。
イベントなどの永続化にJSONライブラリであるCirceを使ったのでその手順をメモしておく。ここではAkka Persistenceの導入そのものの話はしない(そのうち書く)。
- Serializer
- Java serializer(deprecated)
- Circe
- Akka-Serialization-Helper
- メッセージのルートとなるtraitを作成する
- シリアライザを定義する
- application.conf
- 実際のメッセージクラス
- 実際に記録される値
- まとめ
- 参考文献
Serializer
Akka Persistenceはアクターの状態を永続化するために、DBにイベントをどんどん書き込んでいき、アクターが再起動したときはイベントを読み戻す。しかしアクターが送受信するデータはプレーンなScalaオブジェクトなので、どこかでDBに記録できる形式にエンコードしなければDBに記録できないし、DBから読み戻すこともできない。この変換作業をAkkaではSerializationと呼んでいる。
Java serializer(deprecated)
Akkaは標準でJavaの標準機能を使ったSerialization手法を提供している。これはあくまで試験用の実装であり、ナイーブな実装である。Java標準のSerializerを使うので、互換性が弱く、低速だ。
したがって本番では絶対に使うべきではないが、特定の設定をapplication.conf
に設定することで有効化でき、何の設定もなくSerializeできるようになる:
akka.actor.allow-java-serialization = on
Circe
CirceはScala用のそこそこ高速なJSONシリアライザ・デシリアライザだ。ScalaにおけるJSONライブラリとしてはデファクトに近い地位を誇るし、大勢の開発者によって長く丁寧にメンテナンスされている。
Serializerとしては色々なエンコーディングを使うことができるが(cf. 参考文献)、今回はCirceをSerializerとして使う。
Akka-Serialization-Helper
さて、Serializerを使うときに起こりがちな落とし穴を回避するためのコンパイラプラグインがAkka Serialization Helperだ。今回はこれを使ってCirceをSerializerにする。
Circeを使ったSerializeでは、以下のような問題にぶつかることがよくある:
- シリアライズ・デシリアライズする際に、データに対応する
Encoder
/Decoder
が定義されておらず、失敗する- Scalaのコードの中では型で守られるが、DBに保存されているデータに対しては型が付かないので実行時に失敗するリスクがある
- AkkaはSerializeを行うときにSelializerを見付けられない場合デフォルトのJava Serializerにフォールバックする(!!)
- これにより、意図せずに変なデータがDBに保存されてリプレイに失敗する
Akka Serialization Helperはコンパイラプラグインとして、アクターとのやりとりに使われるメッセージが特定のシリアライザを使っていることを強制し、安全にシリアライズ・デシリアライズできるようにする、というコンセプト。
まずはコンパイラプラグインを入れる:
addSbtPlugin("org.virtuslab.ash" % "sbt-akka-serialization-helper" % "0.7.2")
次にプロジェクトでプラグインを有効化する:
libraryDependencies += AkkaSerializationHelperPlugin.circeAkkaSerializer
(project in file(".")).enablePlugins(AkkaSerializationHelperPlugin)
メッセージのルートとなるtraitを作成する
Akka Serialization Helperを使ったシリアライズは、アクターとのやりとりに使うメッセージすべてを特定のtraitのサブタイプにすることで行う。こうすることで、やりとりに使うメッセージが特定のSerializerを通ることをコンパイル時に確認できるという仕組み。サブタイプではないメッセージを送信しようとした場合、コンパイルが通らなくなる。
package com.github.windymelt.apsiren.protocol import org.virtuslab.ash.annotation.SerializabilityTrait @SerializabilityTrait trait CirceAkkaSerializable extends Product with Serializable
今回はこのCirceAkkaSerializable
がすべてのメッセージの頂点に立つ。
シリアライザを定義する
Akkaが使うシリアライザを定義する。Akka Serialization Helperが用意した抽象クラスを実装するだけでよい。
import akka.actor.ExtendedActorSystem import org.virtuslab.ash.circe.CirceAkkaSerializer import org.virtuslab.ash.circe.Register import org.virtuslab.ash.circe.Registration class CirceSerializer(actorSystem: ExtendedActorSystem) extends CirceAkkaSerializer[CirceAkkaSerializable](actorSystem) { override def identifier: Int = 12345 // シリアライザを特定する一意な数字 // 全て自動導出に任せる場合はautoを使う // import io.circe.generic.auto._ // ここに各種メッセージのtraitを登録していく override lazy val codecs = Seq( Register[Message1], Register[Message2], ... ) override lazy val manifestMigrations = Nil override lazy val packagePrefix = "com.github.windymelt.foobarprefix" // このSerializerがこのpackageの中に含まれている必要がある }
Register
を使ってメッセージクラスを登録している様子が分かると思う。実際の運用ではアクターごとにメッセージはsealed trait
などでまとめられていると思うので、そのtrait
を記していけばよい。
application.conf
このシリアライザを使うことをapplication.conf
で宣言しておく必要がある:
akka.actor { serializers { circe = "com.github.windymelt.foobarprefix.protocol.CirceSerializer" } serialization-bindings { "com.github.windymelt.foobarprefix.protocol.CirceAkkaSerializable" = circe } allow-java-serialization = off }
この記述により、CirceAkkaSerializable
のサブタイプは全てCirceSerializer
でシリアライズ・デシリアライズされるようになる。
実際のメッセージクラス
実際のメッセージクラスは、CirceAkkaSerializable
を継承しておけばよい。
以下に自分のOSSでの実例を示す:
object Followers { final case class Follower( url: String, inbox: String ) sealed trait Command extends CirceAkkaSerializable final case class Add(user: Follower, replyTo: ActorRef[Ok.type]) extends Command final case class Remove(url: String, replyTo: ActorRef[Ok.type]) extends Command final case class GetAll(replyTo: ActorRef[Followers]) extends Command final case class GetCount(replyTo: ActorRef[Int]) extends Command sealed trait Result extends CirceAkkaSerializable final case object Ok extends Result final case class Followers(followers: Iterable[Follower]) extends Result // actor event(to be persisted) sealed trait Event extends CirceAkkaSerializable final case class Added(user: Follower) extends Event final case class Removed(user: String) extends Event }
実際に記録される値
実際にどういう値が記録されるか見てみよう。以下のようなSQLをイベントソーシングのバックエンドDBに発行するとどのようなJSONが保存されているか見ることができる。
select convert_from(event_payload, 'UTF8') from event_journal limit 1
{"Added":{"user":{"url":"*******","inbox":"*******"}}}
普通のJSONがBYTEA
形式でUTF-8エンコーディングされて保存されていることがわかった。
まとめ
- Akka Serialization Helperの力を借りながら、CirceをAkka PersistenceのSerializerに設定し、DBにJSON形式でイベントを保存できた。
参考文献
Kryoを使う例