しょうもなグラムというパロディSNSを作っている。キラキラした投稿ではなく、しょうもない投稿を褒め称えていくというコンセプトのSNSで、「いいね」ボタンの代わりに「しょうもな」ボタンがある。ハートマークではなく、ウンコのマークになっている。
まあそれはさておき、しょうもなグラムのフロントエンドはNext.jsで書かれており、Apolloを使ってバックエンドのScalaサーバとGraphQLで連絡するという設計にしてある。これは趣味というか、勉強のためにそうしている。
SNSという特性上、GraphQLのsubscription機能を使って新着投稿を自動受信できるようになっていたら面白い、と考えていたところで、まさにそのための実装が公開されていた。今回はこの紹介。
- 前提
- 実装
- ライブラリたち
- 最前段: handleWebSocketMessages
- graphQlSubscriptionSocket
- SubscriptionActor
- SubscriptionEventPublisher
- まとめ
前提
- GraphQLにはsubscriptionというオペレーションがある
- subscriptionすると、それに対応する変更が生じたとき、データをサーバがプッシュしてくれる
- subscriptionはHTTPとWebSocketとの2つのプロトコルで利用できるが、接続を維持する必要があるという実用上の理由で主にWebSocketが使われている
実装
SangriaのOrganizationにサンプル実装が公開されている(Sangria本体にマージはされていない様子)。これの内部構造を眺めてみよう。
ライブラリたち
まずこのシステムは、Akka HTTPとAkka Streams、そしてSangriaを利用したシステムである。
- Akka HTTP
- Akkaを使ったHTTPサーバ・クライアント実装
- ノンブロッキングIOが特徴的
- WebSocket用のバインディングが存在しており、双方向ストリームの形でユーザがハンドルできるようにしてくれる
- Akka Streams
- Akkaを使ったReactive Streamの実装
- どんどん流れてくる情報をうまくハンドルするための仕組み
- Sangria
- ScalaのGraphQL実装
- これ自体にはsubscriptionするためのWebSocket実装は含まれていない
最前段: handleWebSocketMessages
- 責務: WebSocket通信をAkka StreamsのFlowとして扱えるようにする
subscriptionのためのリクエストは特定のWebSocketエンドポイントにやってくる。Akka HTTPを使って、このエンドポイントに対する処理を実装している。
val subscriptionEventPublisher = system actorOf Props(new SubscriptionEventPublisher(eventStorePublisher)) // 略 path("graphql") { post { // 略 executeQuery(query, operation, vars) } } ~ get(handleWebSocketMessages(graphQlSubscriptionSocket(subscriptionEventPublisher, ctx)))
/graphql
にPOSTされてきたリクエストは通常のクエリとして捌いているが、GETしに来た場合はhandleWebSocketMessages
に処理を委ねている。
handleWebSocketMessages
は独自実装ではなく、Akka HTTPが提供するディレクティブである。どういう挙動をするかは、シグニチャを見るとおおよその検討がつく。
def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route
まず返り値はRoute
である。内部に何らかの引数を渡すと、最終的にルーティングにしてくれるというディレクティブである。
では引数の型はというとhandler: Flow[Message, Message, Any]
である。Flow
というのはAkka Streamsの概念で、「なんかのストリームを受け取って加工し、なんかのストリームを返す」ような構成要素のことである。FlowやSource、SinkといったAkka Streamsの構成要素については以下が詳しい。
handler
のシグニチャはFlow[Message, Message, Any]
になっているから、Message
を入力してMessage
を出力するようなFlow
であればなんでもよい、という意味。ここでのMessage
とは、WebSocket上を流れるメッセージのこと。
ここでもう一度ルーティング箇所の記述を見てみると、handleWebSocketMessages
に引数として渡されているgraphQlSubscriptionSocket(subscriptionEventPublisher, ctx)
の型がFlow[Message, Message, _]
になっているという検討がつく。このgraphQlSubscriptionSocket
の実装について見てみよう。
graphQlSubscriptionSocket
- 責務: WebSocket経由のSubscriptionメッセージを受け取り、更新があったらWebSocketメッセージを返す
graphQlSubscriptionSocket
はこのプロジェクトの独自実装となっている。
シグニチャは以下の通り。
def graphQlSubscriptionSocket(publisher: ActorRef, ctx: Ctx)(implicit system: ActorSystem, materializer: ActorMaterializer, timeout: Timeout)
implicit
に指定されている引数はアクターまわりを扱う際に登場する定番引数なので、あまり詳しく説明しないが、ActorSystem
はアクターの起動などの管理を行うクラスで、ActorMaterializer
はアクターが実際に稼動するスレッドなどの管理を行っている。Timeout
は一定時間の応答がなかった場合に接続を切るためのものだろう。
重要なのは、publisher
である。publisher
の型はActorRef
、つまりアクターへの参照なので、ここにメッセージを送信したり、メッセージがここから送られてくるという想像ができる。(ctx
はGraphQLでいうところのコンテキストオブジェクトで、ここではあまり関係が無さそうだったので割愛する。)
graphQlSubscriptionSocket
では、以下の事を行っている。
- 新たに
SubscriptionActor
を起動している。 incoming
という名のSink
を定義している。outgoing
という名のSource
を定義している。incoming
とoutgoing
とを合成してFlow
に変換している(返り値)。
それぞれについて、順を追って見ていこう。まずは実装をここに引用するので、適宜見ながら読んでほしい。
import akka.NotUsed import akka.actor.{ActorRef, ActorSystem, PoisonPill, Props} import akka.http.scaladsl.model.ws._ import akka.stream.{ActorMaterializer, OverflowStrategy} import akka.stream.scaladsl._ import akka.util.Timeout import spray.json._ import scala.util._ trait SubscriptionSupport { import SubscriptionActor._ def graphQlSubscriptionSocket(publisher: ActorRef, ctx: Ctx)(implicit system: ActorSystem, materializer: ActorMaterializer, timeout: Timeout) = { val subscriptionActor = system.actorOf(Props(new SubscriptionActor(publisher, ctx))) // Transform any incoming messages into Subscribe messages and let the subscription actor know about it val incoming = Flow[Message] .collect { case TextMessage.Strict(input) => Try(input.parseJson.convertTo[Subscribe]) } .collect { case Success(subscription) => subscription } .to(Sink.actorRef[Subscribe](subscriptionActor, PoisonPill)) // connect the subscription actor with the outgoing WebSocket actor and transform a result into a WebSocket message. val outgoing = Source.actorRef[QueryResult](10, OverflowStrategy.fail) .mapMaterializedValue { outputActor => subscriptionActor ! Connected(outputActor) NotUsed } .map { msg: SubscriptionMessage => msg match { case result: QueryResult => TextMessage(result.json.compactPrint) case result: SubscriptionAccepted => TextMessage("subscription accepted.") } } Flow.fromSinkAndSource(incoming, outgoing) } }
subscriptionActor
graphQlSubscriptionSocket
は、まずsubscriptionActor
を作成している。引数としてpublisher
とctx
を渡しているので、おそらく更新されたときのデータを配ってくれるのだろうという検討がつく。subscriptionActor
については後述する。
incoming
// Transform any incoming messages into Subscribe messages and let the subscription actor know about it val incoming = Flow[Message] .collect { case TextMessage.Strict(input) => Try(input.parseJson.convertTo[Subscribe]) } .collect { case Success(subscription) => subscription } .to(Sink.actorRef[Subscribe](subscriptionActor, PoisonPill))
Flow
から始まっているのでFlow
であるかのような錯覚を覚えるが、最終的に.to(Sink.actorRef)
の形式になっているので全体としてはSink
になる。このSink
がやっている事は以下の通り。
Message
を受け取るが、TextMessage.Strict
であるもののみを選択的に受け付ける(そうでないものは捨てる)。- メッセージがJSONであると仮定してデシリアライズする。デシリアライズ先として
Subscribe
クラスが選ばれている。 - デシリアライズに失敗した場合は捨てる。
Subscribe
クラスに変換できたなら、subscriptionActor
に順次送信していく。(接続が途切れてストリームが終了した場合は、PoisonPill
という特殊なメッセージを送信する)PoisonPill
を受信したアクターは死ぬ。(命名がすごい)
Sink.actorRef
を呼び出しているのがミソで、Flow
で流れてきたメッセージを最終的にアクターに流し込むという構造にできる。アクターは何も返さないので、全体としてはSink
になる。
outgoing
// connect the subscription actor with the outgoing WebSocket actor and transform a result into a WebSocket message. val outgoing = Source.actorRef[QueryResult](10, OverflowStrategy.fail) .mapMaterializedValue { outputActor => subscriptionActor ! Connected(outputActor) NotUsed } .map { msg: SubscriptionMessage => msg match { case result: QueryResult => TextMessage(result.json.compactPrint) case result: SubscriptionAccepted => TextMessage("subscription accepted.") } }
incoming
と対になって実装されているのがoutgoing
。こちらは全体としてSource
になり、データストリームの源泉として振る舞う。このSource
がやっている事は以下の通り。
Source.actorRef
を使ってアクターを生成する。subscriptionActor
に対して、生成したアクターの参照を渡す。渡すときはConnected(参照)
の形をとっている。subscriptionActor
が先程生成したアクターに対してSubscriptionMessage
を投げてくれるので、WebSocketのTextMessage
へと変換して返す。
Source.actorRef
の挙動は結構難しいと思うので解説する。
前提知識として、Akka StreamsにおけるSource/Flow/Sinkという部品には、「部品が入力する値」「部品が出力する値」「部品を作成したタイミングで返す値」の3種類の型がある。ここでSource.actorRef
は、Source.actorRef[QueryResult]
として呼び出されているので、以下のような型を持つ。
Source
として振る舞うので、「部品が入力する値」に対応する型は無い。Source
として振る舞う上での「部品が出力する値」に対応する型はQueryResult
である。- 「部品を作成したタイミングで返す値」の型は、
ActorRef
である。
そして「部品を作成したタイミングで返す値」のことを、Akka Streams用語でMaterialized Value
と呼ぶ。コード中のmapMaterializedValue
は、このMaterialized Valueをマップしているので、部品自体が流す値には影響しない。
ではmapMaterializedValue
で何をしているかというと、Source.actorRef
によって得られたMaterialized ValueであるActorRef
を、「値はこのアクターに送ってください」とsubscriptionActor
に引き渡している。詳細はこの先登場するが、subscriptionActor
は、データが更新されたとき、Connected
で渡されたアクターにデータを送信してくれる。
.mapMaterializedValue { outputActor => subscriptionActor ! Connected(outputActor) NotUsed }
最後にNotUsed
を返しているのは、「もうこのMaterialized Valueは使わないです」という宣言である。
subscriptionActor
がこのoutputActor
にデータを送信してくれるようになるので、それを受け取り加工することでoutgoing
はSource
としての役割を果たすようになる。
Flow.fromSinkAndSource
SinkとSourceとが1つずつあるとき、これをFlowとみなすことができる。データが直接渡ってくるというFlowの一般的な関係ではないものの、Akka Streams的にはSinkとSourceがあったら合成してFlowだとみなすことができるのである。これを実現するのがFlow.fromSinkAndSource
で、読んで字の如くといった合成をしてくれる。
Flow.fromSinkAndSource(incoming, outgoing)
subscriptionのような非同期的になってしまう処理は、このようにアクターを経由したSinkとSourceとを別々に作り、最終的に合成してFlowにするというのが、一種のパターンになっているようだ。
この結果得られたFlowは、「subscriptionさせてください」というWebSocketメッセージを入力として受け取り、それに応じたデータ更新があったときにWebSocketメッセージの出力として返す、といった挙動になる。
SubscriptionActor
- 責務:
publisher
に参加する。Subscribeを受け付け、可能なクエリかどうか確認する。クエリを事前にPrepareする。クエリのPrepareができたことを報告する。接続あたり複数のsubscriptionを扱えるようにクエリを管理する。データ更新を受信し、対応したクエリを実行する。
かなり難しいことをしている!
subscriptionしたクエリと、発生するイベントとの間の対応を取るような調整を行っている。まだうまく理解できていない…………。
SubscriptionEventPublisher
- 責務: イベントが発生したとき、登録済みのアクターに対してイベントをブロードキャストする。登録とその解除を管理する。
Server.scala
で作成されている。さらにeventStorePublisher
を引数に取っているが、これはorg.reactivestreams.Publisher
であればなんでもよい。これがどんどん情報を発してくるので、購読したアクターに対して配ってあげるというのが、SubscriptionEventPublisher
の仕事。
val subscriptionEventPublisher = system actorOf Props(new SubscriptionEventPublisher(eventStorePublisher))
実装を見てみる。
import akka.actor.{Actor, ActorLogging, ActorRef, Terminated} import akka.stream.scaladsl.{Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy} import generic.Event import org.reactivestreams.Publisher object SubscriptionEventPublisher { case object Join } class SubscriptionEventPublisher(publisher: Publisher[Event]) extends Actor with ActorLogging { import SubscriptionEventPublisher._ implicit val materializer = ActorMaterializer() var subscribers: Set[ActorRef] = Set.empty Source.fromPublisher(publisher) .buffer(100, OverflowStrategy.fail) .to(Sink.foreach(e => subscribers.foreach(_ ! e))) .run() def receive: Receive = { case Join => log.info(s"${sender()} joined.") subscribers += sender() context.watch(sender()) case Terminated(subscriber) => log.info(s"${sender()} was terminated.") subscribers -= subscriber } }
こちらは比較的簡単。publisher
をSource
として、バッファリングしつつ、登録済みのアクターへとブロードキャストしている。Join
やTerminated
を受信したときは、登録したり解除したりしている。
まとめ
こういう感じの動作になっている。Akka Streamsをうまく使ってブロードキャストや購読といった仕組みをうまく実装できているのが面白かった。Akka HTTPも良くできていて、普通にWebSocketを扱えるのがすごいと思う。