しょうもなグラムというパロディSNSを作っている。キラキラした投稿ではなく、しょうもない投稿を褒め称えていくというコンセプトのSNSで、「いいね」ボタンの代わりに「しょうもな」ボタンがある。ハートマークではなく、ウンコのマークになっている。
まあそれはさておき、しょうもなグラムのフロントエンドはNext.jsで書かれており、Apolloを使ってバックエンドのScalaサーバとGraphQLで連絡するという設計にしてある。これは趣味というか、勉強のためにそうしている。
SNSという特性上、GraphQLのsubscription機能を使って新着投稿を自動受信できるようになっていたら面白い、と考えていたところで、まさにそのための実装が公開されていた。今回はこの紹介。
- 前提
- 実装
- ライブラリたち
- 最前段: handleWebSocketMessages
- graphQlSubscriptionSocket
- SubscriptionActor
- SubscriptionEventPublisher
- まとめ
前提
- GraphQLにはsubscriptionというオペレーションがある
- subscriptionすると、それに対応する変更が生じたとき、データをサーバがプッシュしてくれる
- subscriptionはHTTPとWebSocketとの2つのプロトコルで利用できるが、接続を維持する必要があるという実用上の理由で主にWebSocketが使われている
実装
SangriaのOrganizationにサンプル実装が公開されている(Sangria本体にマージはされていない様子)。これの内部構造を眺めてみよう。
ライブラリたち
まずこのシステムは、Akka HTTPとAkka Streams、そしてSangriaを利用したシステムである。
- Akka HTTP
- Akkaを使ったHTTPサーバ・クライアント実装
- ノンブロッキングIOが特徴的
- WebSocket用のバインディングが存在しており、双方向ストリームの形でユーザがハンドルできるようにしてくれる
- Akka Streams
- Akkaを使ったReactive Streamの実装
- どんどん流れてくる情報をうまくハンドルするための仕組み
- Sangria
- ScalaのGraphQL実装
- これ自体にはsubscriptionするためのWebSocket実装は含まれていない
最前段: handleWebSocketMessages
- 責務: WebSocket通信をAkka StreamsのFlowとして扱えるようにする
subscriptionのためのリクエストは特定のWebSocketエンドポイントにやってくる。Akka HTTPを使って、このエンドポイントに対する処理を実装している。
val subscriptionEventPublisher = system actorOf Props(new SubscriptionEventPublisher(eventStorePublisher)) // 略 path("graphql") { post { // 略 executeQuery(query, operation, vars) } } ~ get(handleWebSocketMessages(graphQlSubscriptionSocket(subscriptionEventPublisher, ctx)))
/graphqlにPOSTされてきたリクエストは通常のクエリとして捌いているが、GETしに来た場合はhandleWebSocketMessagesに処理を委ねている。
handleWebSocketMessagesは独自実装ではなく、Akka HTTPが提供するディレクティブである。どういう挙動をするかは、シグニチャを見るとおおよその検討がつく。
def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route
まず返り値はRouteである。内部に何らかの引数を渡すと、最終的にルーティングにしてくれるというディレクティブである。
では引数の型はというとhandler: Flow[Message, Message, Any]である。FlowというのはAkka Streamsの概念で、「なんかのストリームを受け取って加工し、なんかのストリームを返す」ような構成要素のことである。FlowやSource、SinkといったAkka Streamsの構成要素については以下が詳しい。
handlerのシグニチャはFlow[Message, Message, Any]になっているから、Messageを入力してMessageを出力するようなFlowであればなんでもよい、という意味。ここでのMessageとは、WebSocket上を流れるメッセージのこと。
ここでもう一度ルーティング箇所の記述を見てみると、handleWebSocketMessagesに引数として渡されているgraphQlSubscriptionSocket(subscriptionEventPublisher, ctx)の型がFlow[Message, Message, _]になっているという検討がつく。このgraphQlSubscriptionSocketの実装について見てみよう。
graphQlSubscriptionSocket
- 責務: WebSocket経由のSubscriptionメッセージを受け取り、更新があったらWebSocketメッセージを返す
graphQlSubscriptionSocketはこのプロジェクトの独自実装となっている。
シグニチャは以下の通り。
def graphQlSubscriptionSocket(publisher: ActorRef, ctx: Ctx)(implicit system: ActorSystem, materializer: ActorMaterializer, timeout: Timeout)
implicitに指定されている引数はアクターまわりを扱う際に登場する定番引数なので、あまり詳しく説明しないが、ActorSystemはアクターの起動などの管理を行うクラスで、ActorMaterializerはアクターが実際に稼動するスレッドなどの管理を行っている。Timeoutは一定時間の応答がなかった場合に接続を切るためのものだろう。
重要なのは、publisherである。publisherの型はActorRef、つまりアクターへの参照なので、ここにメッセージを送信したり、メッセージがここから送られてくるという想像ができる。(ctxはGraphQLでいうところのコンテキストオブジェクトで、ここではあまり関係が無さそうだったので割愛する。)
graphQlSubscriptionSocketでは、以下の事を行っている。
- 新たに
SubscriptionActorを起動している。 incomingという名のSinkを定義している。outgoingという名のSourceを定義している。incomingとoutgoingとを合成してFlowに変換している(返り値)。
それぞれについて、順を追って見ていこう。まずは実装をここに引用するので、適宜見ながら読んでほしい。
import akka.NotUsed import akka.actor.{ActorRef, ActorSystem, PoisonPill, Props} import akka.http.scaladsl.model.ws._ import akka.stream.{ActorMaterializer, OverflowStrategy} import akka.stream.scaladsl._ import akka.util.Timeout import spray.json._ import scala.util._ trait SubscriptionSupport { import SubscriptionActor._ def graphQlSubscriptionSocket(publisher: ActorRef, ctx: Ctx)(implicit system: ActorSystem, materializer: ActorMaterializer, timeout: Timeout) = { val subscriptionActor = system.actorOf(Props(new SubscriptionActor(publisher, ctx))) // Transform any incoming messages into Subscribe messages and let the subscription actor know about it val incoming = Flow[Message] .collect { case TextMessage.Strict(input) => Try(input.parseJson.convertTo[Subscribe]) } .collect { case Success(subscription) => subscription } .to(Sink.actorRef[Subscribe](subscriptionActor, PoisonPill)) // connect the subscription actor with the outgoing WebSocket actor and transform a result into a WebSocket message. val outgoing = Source.actorRef[QueryResult](10, OverflowStrategy.fail) .mapMaterializedValue { outputActor => subscriptionActor ! Connected(outputActor) NotUsed } .map { msg: SubscriptionMessage => msg match { case result: QueryResult => TextMessage(result.json.compactPrint) case result: SubscriptionAccepted => TextMessage("subscription accepted.") } } Flow.fromSinkAndSource(incoming, outgoing) } }
subscriptionActor
graphQlSubscriptionSocketは、まずsubscriptionActorを作成している。引数としてpublisherとctxを渡しているので、おそらく更新されたときのデータを配ってくれるのだろうという検討がつく。subscriptionActorについては後述する。
incoming
// Transform any incoming messages into Subscribe messages and let the subscription actor know about it val incoming = Flow[Message] .collect { case TextMessage.Strict(input) => Try(input.parseJson.convertTo[Subscribe]) } .collect { case Success(subscription) => subscription } .to(Sink.actorRef[Subscribe](subscriptionActor, PoisonPill))
Flowから始まっているのでFlowであるかのような錯覚を覚えるが、最終的に.to(Sink.actorRef)の形式になっているので全体としてはSinkになる。このSinkがやっている事は以下の通り。
Messageを受け取るが、TextMessage.Strictであるもののみを選択的に受け付ける(そうでないものは捨てる)。- メッセージがJSONであると仮定してデシリアライズする。デシリアライズ先として
Subscribeクラスが選ばれている。 - デシリアライズに失敗した場合は捨てる。
Subscribeクラスに変換できたなら、subscriptionActorに順次送信していく。(接続が途切れてストリームが終了した場合は、PoisonPillという特殊なメッセージを送信する)PoisonPillを受信したアクターは死ぬ。(命名がすごい)
Sink.actorRefを呼び出しているのがミソで、Flowで流れてきたメッセージを最終的にアクターに流し込むという構造にできる。アクターは何も返さないので、全体としてはSinkになる。
outgoing
// connect the subscription actor with the outgoing WebSocket actor and transform a result into a WebSocket message. val outgoing = Source.actorRef[QueryResult](10, OverflowStrategy.fail) .mapMaterializedValue { outputActor => subscriptionActor ! Connected(outputActor) NotUsed } .map { msg: SubscriptionMessage => msg match { case result: QueryResult => TextMessage(result.json.compactPrint) case result: SubscriptionAccepted => TextMessage("subscription accepted.") } }
incomingと対になって実装されているのがoutgoing。こちらは全体としてSourceになり、データストリームの源泉として振る舞う。このSourceがやっている事は以下の通り。
Source.actorRefを使ってアクターを生成する。subscriptionActorに対して、生成したアクターの参照を渡す。渡すときはConnected(参照)の形をとっている。subscriptionActorが先程生成したアクターに対してSubscriptionMessageを投げてくれるので、WebSocketのTextMessageへと変換して返す。
Source.actorRefの挙動は結構難しいと思うので解説する。
前提知識として、Akka StreamsにおけるSource/Flow/Sinkという部品には、「部品が入力する値」「部品が出力する値」「部品を作成したタイミングで返す値」の3種類の型がある。ここでSource.actorRefは、Source.actorRef[QueryResult]として呼び出されているので、以下のような型を持つ。
Sourceとして振る舞うので、「部品が入力する値」に対応する型は無い。Sourceとして振る舞う上での「部品が出力する値」に対応する型はQueryResultである。- 「部品を作成したタイミングで返す値」の型は、
ActorRefである。
そして「部品を作成したタイミングで返す値」のことを、Akka Streams用語でMaterialized Valueと呼ぶ。コード中のmapMaterializedValueは、このMaterialized Valueをマップしているので、部品自体が流す値には影響しない。
ではmapMaterializedValueで何をしているかというと、Source.actorRefによって得られたMaterialized ValueであるActorRefを、「値はこのアクターに送ってください」とsubscriptionActorに引き渡している。詳細はこの先登場するが、subscriptionActorは、データが更新されたとき、Connectedで渡されたアクターにデータを送信してくれる。
.mapMaterializedValue { outputActor =>
subscriptionActor ! Connected(outputActor)
NotUsed
}
最後にNotUsedを返しているのは、「もうこのMaterialized Valueは使わないです」という宣言である。
subscriptionActorがこのoutputActorにデータを送信してくれるようになるので、それを受け取り加工することでoutgoingはSourceとしての役割を果たすようになる。
Flow.fromSinkAndSource
SinkとSourceとが1つずつあるとき、これをFlowとみなすことができる。データが直接渡ってくるというFlowの一般的な関係ではないものの、Akka Streams的にはSinkとSourceがあったら合成してFlowだとみなすことができるのである。これを実現するのがFlow.fromSinkAndSourceで、読んで字の如くといった合成をしてくれる。
Flow.fromSinkAndSource(incoming, outgoing)
subscriptionのような非同期的になってしまう処理は、このようにアクターを経由したSinkとSourceとを別々に作り、最終的に合成してFlowにするというのが、一種のパターンになっているようだ。
この結果得られたFlowは、「subscriptionさせてください」というWebSocketメッセージを入力として受け取り、それに応じたデータ更新があったときにWebSocketメッセージの出力として返す、といった挙動になる。
SubscriptionActor
- 責務:
publisherに参加する。Subscribeを受け付け、可能なクエリかどうか確認する。クエリを事前にPrepareする。クエリのPrepareができたことを報告する。接続あたり複数のsubscriptionを扱えるようにクエリを管理する。データ更新を受信し、対応したクエリを実行する。
かなり難しいことをしている!
subscriptionしたクエリと、発生するイベントとの間の対応を取るような調整を行っている。まだうまく理解できていない…………。
SubscriptionEventPublisher
- 責務: イベントが発生したとき、登録済みのアクターに対してイベントをブロードキャストする。登録とその解除を管理する。
Server.scalaで作成されている。さらにeventStorePublisherを引数に取っているが、これはorg.reactivestreams.Publisherであればなんでもよい。これがどんどん情報を発してくるので、購読したアクターに対して配ってあげるというのが、SubscriptionEventPublisherの仕事。
val subscriptionEventPublisher = system actorOf Props(new SubscriptionEventPublisher(eventStorePublisher))
実装を見てみる。
import akka.actor.{Actor, ActorLogging, ActorRef, Terminated} import akka.stream.scaladsl.{Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy} import generic.Event import org.reactivestreams.Publisher object SubscriptionEventPublisher { case object Join } class SubscriptionEventPublisher(publisher: Publisher[Event]) extends Actor with ActorLogging { import SubscriptionEventPublisher._ implicit val materializer = ActorMaterializer() var subscribers: Set[ActorRef] = Set.empty Source.fromPublisher(publisher) .buffer(100, OverflowStrategy.fail) .to(Sink.foreach(e => subscribers.foreach(_ ! e))) .run() def receive: Receive = { case Join => log.info(s"${sender()} joined.") subscribers += sender() context.watch(sender()) case Terminated(subscriber) => log.info(s"${sender()} was terminated.") subscribers -= subscriber } }
こちらは比較的簡単。publisherをSourceとして、バッファリングしつつ、登録済みのアクターへとブロードキャストしている。JoinやTerminatedを受信したときは、登録したり解除したりしている。
まとめ

こういう感じの動作になっている。Akka Streamsをうまく使ってブロードキャストや購読といった仕組みをうまく実装できているのが面白かった。Akka HTTPも良くできていて、普通にWebSocketを扱えるのがすごいと思う。