Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Akka(2.8) PersistenceでSerializerとしてCirceを使う方法

Akka PersistenceはAkkaでイベントソーシングを行い、アクターの状態を永続化するためのライブラリである。

doc.akka.io

イベントなどの永続化にJSONライブラリであるCirceを使ったのでその手順をメモしておく。ここではAkka Persistenceの導入そのものの話はしない(そのうち書く)。

Serializer

Akka Persistenceはアクターの状態を永続化するために、DBにイベントをどんどん書き込んでいき、アクターが再起動したときはイベントを読み戻す。しかしアクターが送受信するデータはプレーンなScalaオブジェクトなので、どこかでDBに記録できる形式にエンコードしなければDBに記録できないし、DBから読み戻すこともできない。この変換作業をAkkaではSerializationと呼んでいる。

doc.akka.io

Java serializer(deprecated)

Akkaは標準でJavaの標準機能を使ったSerialization手法を提供している。これはあくまで試験用の実装であり、ナイーブな実装である。Java標準のSerializerを使うので、互換性が弱く、低速だ。

doc.akka.io

したがって本番では絶対に使うべきではないが、特定の設定をapplication.confに設定することで有効化でき、何の設定もなくSerializeできるようになる:

akka.actor.allow-java-serialization = on

Circe

CirceはScala用のそこそこ高速なJSONシリアライザ・デシリアライザだ。ScalaにおけるJSONライブラリとしてはデファクトに近い地位を誇るし、大勢の開発者によって長く丁寧にメンテナンスされている。

circe.github.io

Serializerとしては色々なエンコーディングを使うことができるが(cf. 参考文献)、今回はCirceをSerializerとして使う。

Akka-Serialization-Helper

さて、Serializerを使うときに起こりがちな落とし穴を回避するためのコンパイラプラグインがAkka Serialization Helperだ。今回はこれを使ってCirceをSerializerにする。

github.com

Circeを使ったSerializeでは、以下のような問題にぶつかることがよくある:

  • シリアライズ・デシリアライズする際に、データに対応するEncoder/Decoderが定義されておらず、失敗する
    • Scalaのコードの中では型で守られるが、DBに保存されているデータに対しては型が付かないので実行時に失敗するリスクがある
  • AkkaはSerializeを行うときにSelializerを見付けられない場合デフォルトのJava Serializerにフォールバックする(!!)
    • これにより、意図せずに変なデータがDBに保存されてリプレイに失敗する

Akka Serialization Helperはコンパイラプラグインとして、アクターとのやりとりに使われるメッセージが特定のシリアライザを使っていることを強制し、安全にシリアライズ・デシリアライズできるようにする、というコンセプト。

まずはコンパイラプラグインを入れる:

addSbtPlugin("org.virtuslab.ash" % "sbt-akka-serialization-helper" % "0.7.2")

次にプロジェクトでプラグインを有効化する:

libraryDependencies += AkkaSerializationHelperPlugin.circeAkkaSerializer
(project in file(".")).enablePlugins(AkkaSerializationHelperPlugin)

メッセージのルートとなるtraitを作成する

Akka Serialization Helperを使ったシリアライズは、アクターとのやりとりに使うメッセージすべてを特定のtraitのサブタイプにすることで行う。こうすることで、やりとりに使うメッセージが特定のSerializerを通ることをコンパイル時に確認できるという仕組み。サブタイプではないメッセージを送信しようとした場合、コンパイルが通らなくなる。

package com.github.windymelt.apsiren.protocol
import org.virtuslab.ash.annotation.SerializabilityTrait

@SerializabilityTrait
trait CirceAkkaSerializable extends Product with Serializable

今回はこのCirceAkkaSerializableがすべてのメッセージの頂点に立つ。

シリアライザを定義する

Akkaが使うシリアライザを定義する。Akka Serialization Helperが用意した抽象クラスを実装するだけでよい。

import akka.actor.ExtendedActorSystem
import org.virtuslab.ash.circe.CirceAkkaSerializer
import org.virtuslab.ash.circe.Register
import org.virtuslab.ash.circe.Registration

class CirceSerializer(actorSystem: ExtendedActorSystem)
    extends CirceAkkaSerializer[CirceAkkaSerializable](actorSystem) {

  override def identifier: Int = 12345 // シリアライザを特定する一意な数字

  // 全て自動導出に任せる場合はautoを使う
  // import io.circe.generic.auto._

  // ここに各種メッセージのtraitを登録していく
  override lazy val codecs =
    Seq(
      Register[Message1],
      Register[Message2],
      ...
    )

  override lazy val manifestMigrations = Nil

  override lazy val packagePrefix = "com.github.windymelt.foobarprefix" // このSerializerがこのpackageの中に含まれている必要がある
}

Registerを使ってメッセージクラスを登録している様子が分かると思う。実際の運用ではアクターごとにメッセージはsealed traitなどでまとめられていると思うので、そのtraitを記していけばよい。

application.conf

このシリアライザを使うことをapplication.confで宣言しておく必要がある:

akka.actor {
  serializers {
    circe = "com.github.windymelt.foobarprefix.protocol.CirceSerializer"
  }
  serialization-bindings {
    "com.github.windymelt.foobarprefix.protocol.CirceAkkaSerializable" = circe
  }
  allow-java-serialization = off
}

この記述により、CirceAkkaSerializableのサブタイプは全てCirceSerializerでシリアライズ・デシリアライズされるようになる。

実際のメッセージクラス

実際のメッセージクラスは、CirceAkkaSerializableを継承しておけばよい。

以下に自分のOSSでの実例を示す:

object Followers {
  final case class Follower(
      url: String,
      inbox: String
  )
  sealed trait Command extends CirceAkkaSerializable

  final case class Add(user: Follower, replyTo: ActorRef[Ok.type])
      extends Command

  final case class Remove(url: String, replyTo: ActorRef[Ok.type])
      extends Command

  final case class GetAll(replyTo: ActorRef[Followers]) extends Command
  final case class GetCount(replyTo: ActorRef[Int]) extends Command

  sealed trait Result extends CirceAkkaSerializable
  final case object Ok extends Result
  final case class Followers(followers: Iterable[Follower]) extends Result

  // actor event(to be persisted)
  sealed trait Event extends CirceAkkaSerializable
  final case class Added(user: Follower) extends Event
  final case class Removed(user: String) extends Event
}

blog.3qe.us

実際に記録される値

実際にどういう値が記録されるか見てみよう。以下のようなSQLをイベントソーシングのバックエンドDBに発行するとどのようなJSONが保存されているか見ることができる。

select convert_from(event_payload, 'UTF8') from event_journal limit 1
{"Added":{"user":{"url":"*******","inbox":"*******"}}}

普通のJSONがBYTEA形式でUTF-8エンコーディングされて保存されていることがわかった。

まとめ

  • Akka Serialization Helperの力を借りながら、CirceをAkka PersistenceのSerializerに設定し、DBにJSON形式でイベントを保存できた。

参考文献

Kryoを使う例

qiita.com

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?