どうも。Windymeltです。 僕は今はてなサマーインターンで京都に滞在しており、id:masawadaが私立プログラミングキャンプのお誘いをしてくれて僕も参加してきたのでそのレポートです。
私立プログラミングキャンプははてなオフィスで行われたため迷わず会場に来ることができました。会場では大量のレッドブルとサイボウズウォーターが配給?されてメンバーの戦意を高揚させていました。
このキャンプ、ひとたび自己紹介が終わればいきなり作業に突入するタイプのイベントのようです。事前に何もする目標を決めていなかったので恐怖以外のなにものでもありませんでしたが、なんと同じScalaユーザであるUAdachiさんに遭遇したためScalaプログラミングをすることにしました。わいわい。
今回の技術的目標はScalaによるWebSocketを利用したXFD(eXtreme Frrdback Device)の作成でした。自前の開発環境が諸事情により使えないためTypesafe Activatorを利用して開発しました。UAdachiさんはサーバサイドに強い方ということなのでJenkinsサーバの建築などを行なっていただきました。
XFDを知らない方の為に説明すると、これはテストの結果などをちょっと楽しく通知しようというエクストリームプログラミング的な楽しみの一つです。電子工作やギミックなどを活用し、大々的にテストの結果をチームに共有することで修正の機運を高めようという試みです。もちろん、ギミックを見てひとときの癒しを得るという非常に良い効能があります。興味を持たれた方はArduino で Jenkins の XFD を作る - haradakiro's blogなどを参照なさると良いと思います。
今回作成したXFDは電子工作を使用しない、ブラウザに色でビルド状況を通知するシンプルなものでしたが、WebSocketやPlay Frameworkなどのイカした機能を用いて実装しています。もうCometなんか扱わなくても良い喜びがあります。
会場を貸してくださった株式会社はてなさん、レッドブルとサイボウズウォーターの配給を下さったサイボウズさん、ありがとうございました。
実装解説
開発環境はActivator 1.2.10 Akka 2.3.4 Play 2.3.3 Scala 2.11.1です。
まずWebSocketを扱うURLとWebSocketを扱うjavascriptを送り出すURL、そしてJenkinsがWebhookとして叩くURLが必要になるため、conf/routes
にルーティングの記述を行います。
GET /xfd/status controllers.Application.xfd GET /xfd/ws controllers.Application.xfdWs GET /xfd/jenkinsnotifyget controllers.Application.jenkinsNotify(status, job_name, build_number)
Jenkinsが叩くURLは本来はPOSTであるべきでしたが、うまく実装できずに時間が不足してきたためGETを用いて実装しています。
続いて、リクエストを処理するためのControllerをapp/controllers/Application.scala
に実装します。まずは全コードを載せ、続いて細かい説明を行います。
package controllers import play.api._ import play.api.mvc._ import play.api.data.Form import play.api.data.Forms._ import play.api.libs.json.Json import play.api.libs.iteratee._ import play.api.Play.current import scala.concurrent.duration.DurationInt import akka.actor._ import akka.pattern.ask import akka.util.Timeout import scala.concurrent.Future import play.api.libs.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import models._ object Application extends Controller { val jenkinsReceiver = Akka.system.actorOf(Props[JenkinsReceiver]) def xfd = Action { Ok(views.html.iphone()) } def xfdWs = WebSocket.async { request => implicit val timeout: akka.util.Timeout = 1 minutes val connecting = jenkinsReceiver ? Connect("joining") connecting.mapTo[(Iteratee[String, _], Enumerator[String])] } def jenkinsNotify(status: String, job_name: String, build_number: String) = Action { implicit request => val query = Map("status" -> status, "job_name" -> job_name, "build_number" -> build_number) // throw jenkinsnotify jenkinsReceiver ! JenkinsNotify(query) Ok("ok") } } case class Connect(msg:String) case class JenkinsNotify(msg: Map[String, String]) case class Broadcast(msg: String) class JenkinsReceiver extends Actor { val (enumerator, channel) = Concurrent.broadcast[String] def receive = { case Connect(_) => val iteratee = Iteratee.foreach[String] { message => self ! Broadcast("user said " + message) }.map { _ => self ! Broadcast("user left") } sender ! (iteratee, enumerator) case JenkinsNotify(msg) => msg match { case query: Map[String, String] => // write parsing and broadcast code here println(query) self ! Broadcast("{ \"status\":\"" + query("status") + "\", \"name\":\"" + query("job_name") + "\", \"no\":\"" + query("build_number") + "\" }") case default => println("unrecognized message: " + default) } case Broadcast(msg) => channel.push(msg); } }
コードの中盤でViewを呼び出している部分があります。これはroute
で/xfd/status
にGET
が投げられた時に呼ばれるアクションとして定義されています。
def xfd = Action {
Ok(views.html.iphone())
}
Playではリクエストに対するアクションをAction
というDSL的な囲いの中で定義します。アクションの内容は、200 OK
でviews.html.iphone()
というViewを呼び出すものです。views.html.iphone
に対応するViewは、app/views/iphone.scala.html
に、以下のように定義されています。
@() @main("xfd") { main? <section id="message"></section> <script src="/assets/javascripts/ws.js"></script> }
簡単に説明すると、最上行は引数がないことを示しており、途中の行ではmain
テンプレートを"xfd"
という引数で呼び出しています。mainはhtml
タグやhead
タグなどの定義を共通して行う為に定義してあるものです。次にメッセージ表示のためのsection
タグが置かれ、次に/assets/javascripts/ws.js
というスクリプトが呼ばれています。このスクリプトはクライアントサイドで動作するもので、後程説明します。
app/controllers/Application.scala
に戻りましょう。
case class Connect(msg:String) case class JenkinsNotify(msg: Map[String, String]) case class Broadcast(msg: String)
これらのケースクラスは、Actorに処理を投げるときにデータのカプセルとして使います。Akkaでのメッセージパッシングは通例ケースクラスを用いて行います。Connect
はWebSocketの接続時に、JankinsNotify
はJenkinsから飛んできたWebhookを渡すときに中間のデータホルダとして、Broadcastは接続している全てのWebSocketにメッセージを投げるときに使います。
JenkinsからのWebhookの受付部分を見てみましょう。
def jenkinsNotify(status: String, job_name: String, build_number: String) = Action { implicit request => val query = Map("status" -> status, "job_name" -> job_name, "build_number" -> build_number) // throw jenkinsnotify jenkinsReceiver ! JenkinsNotify(query) Ok("ok") } }
受け取ったクエリパラメータをJenkinsNotifyクラスにカプセル化してJenkinsReceiverに渡しています。
WebSocketの受付部分を見てみましょう。
val jenkinsReceiver = Akka.system.actorOf(Props[JenkinsReceiver]) def xfd = Action { Ok(views.html.iphone()) } def xfdWs = WebSocket.async { request => implicit val timeout: akka.util.Timeout = 1 minutes val connecting = jenkinsReceiver ? Connect("joining") connecting.mapTo[(Iteratee[String, _], Enumerator[String])] }
xfd
メソッドはもう見ましたが、その前後を見ていきます。まずJenkinsReceiver
クラスで定義されたアクターを作成してjenkinsReceiver
という名前に割り当てています。ScalaでActorを使うのに必要なAkkaなどはPlay Frameworkで半自動的に定義さているので、特に別途の設定を行う必要が無ければそのまま使うことができます。
xfdWs
メソッドを見てみましょう。このメソッドはクライアントがWebSocketで接続を開こうとするときに呼ばれるコードです。WebSocketの通信はActionでは処理できないため、Action
ではなくWebSocket.async
で処理しているのが分かると思います。requestが渡ってきますが、処理には必要でないので使っていません。
メソッド1行目ではアクターにメッセージを投げた時にどのくらい応答が無ければタイムアウトとしてエラーを返すかを定義しています。WebSocketのタイムアウトとは関係のない定義です。そもそもWebSocketにタイムアウトはありません。たぶん。
メソッド2行目でjenkinsReceiver
にConnectケースクラスを投げて返答させます。jenkinsReceiver ? Connect("joining")
はjenkinsReceiver.ask(Connect("joining"))
の構文糖で、import akka.pattern.ask
することで使えるようになります。またask
は非同期で行われるので処理は次に進みます。
メソッド3行目でメッセージの返答を(Iteratee[String, _], Enumerator[String])
という型で待ち受けるよう定義し、それをブロックの戻り値にしています。WebSocket.async
はFuture[(Iteratee[String, _], Enumerator[String])]
を受けることになります。(WebSocket.async
はdeprecatedになっているようです。tryAccept
を使えと怒られました。これはFuture[Either[Result, (Iteratee[A, _], Enumerator[A])]]
を受けます。)
ここでIteratee
とEnumerator
という厄介な概念が登場するのですが、Iteratee
は入力を担当し、Enumerator
は出力を担当しています。ストリーミングなどの複雑な処理をを容易に行うために利用されているようなのですが、僕も良く分かっていません。
いよいよアクターに移ります。
class JenkinsReceiver extends Actor { val (enumerator, channel) = Concurrent.broadcast[String] def receive = { case Connect(_) => val iteratee = Iteratee.foreach[String] { message => self ! Broadcast("user said " + message) }.map { _ => self ! Broadcast("user left") } sender ! (iteratee, enumerator) case JenkinsNotify(msg) => msg match { case query: Map[String, String] => // write parsing and broadcast code here println(query) self ! Broadcast("{ \"status\":\"" + query("status") + "\", \"name\":\"" + query("job_name") + "\", \"no\":\"" + query("build_number") + "\" }") case default => println("unrecognized message: " + default) } case Broadcast(msg) => channel.push(msg); } }
val (enumerator, channel) = Concurrent.broadcast[String]
の部分では、WebSocketに接続している全てのクライアントに対して送信するためのチャンネルを取得しています。書き方次第では個別に通信できそうなのですが、このあたりの仕組みは後ほど調べたいと思います。
def receive ...
の部分はメッセージを受信した時の処理をパターンマッチを用いて記述しています。Connect(_)
を受け取った時は"foreachでメッセージを受けて自分に対してBroadcast
を送り、map
で相手が接続を終了した時はBroadcasttを投げる"という処理をiteratee
として定義しています。これを送信元にenumeratorと組にして返しています。
JenkinsNotify
を受け取った時はmsg: Map[String, String]をケースクラスから取り出し、self
に対してmsgをJSONに変換したものをBroadcast
メッセージに載せて送信しています。
さて、Broadcast
メッセージを受け取った時は接続している全てのWebSocketに対してメッセージを送信しています。
クライアント側のjavascriptではJSONを受信した時にその内容で画面の色を変える操作を行なっています。メッセージが送られてくるとws.onmessage
が呼ばれます。
var ws = new WebSocket("ws://5384a561.ngrok.com/xfd/ws"); ws.onopen = function () { $('body').text('opened'); }; ws.onmessage = function (message) { var parsed = JSON.parse(message.data); if (parsed.status == 'running') $('body').attr('style', 'background-color: cyan'); if (parsed.status == 'success') $('body').attr('style', 'background-color: green'); if (parsed.status == 'failed') $('body').attr('style', 'background-color: red'); }; ws.onerror = function (evt) { $('#message').append(' Error occurred: ' + evt + "\n"); };
これによりJenkinsのビルドの状況をWabhookするとクライアントに状況が通知されるシステムが出来ました。