ScalaのPlay Frameworkでは非同期処理を扱う時にIteratee/Enumeratorが登場するが、先日自分でプログラミングしていてよく分からない概念だと感じたのでメモ代わりに調べた事を纏める。
Iterateeとは入力されたデータを順に処理するための抽象的な仕組みらしい。Iterateeは繰返しのそれぞれのステップで、「次の値」「データ入力が使えないこと」「繰返し処理が完了したこと」を示す3つの値をとる。
英語では、'employ'と'employee'のように、'-ee'と続く名詞は受動態を示しているので、「繰返し構造の抽象化」であるイテレータを能動受動を逆転させた「繰返される構造の抽象化」といったところらしい。EnumeratorとEnumerateeはIterateeと対を成す概念だ。
Enumeratorはストリームを生産するものであり、Enumerateeはストリームを消費するものである。 Enumerateeはイミュータブルで再利用でき、エラーや結果、計算を内包する概念である。
Iterator to Iteratee
この節はおよそ次の記事の劣化版翻訳であり、コードサンプルも拝借しています。 Understanding Play2 Iteratees for Normal Humans - Mandubian Blog
まずIteratee
を理解するためにはIterator
の概念について復習する必要がある。JavaライクにIteratorを書くと次のようなコードになるだろう:
val l = List(1, 234, 455, 987) var total = 0 var it = l.iterator while( it.hasNext ) { total += it.next }
このような繰返し構造のコードを書くとき、プログラマはいくつかの点を意識しながらコードを書く事になる。
- 繰返して処理されるストリームの状態(空ではないか?エラーはないか?)
- 現在の状態と次の状態を繋ぐのに必要なコンテキスト(ここでは合計値を示す
total
) - コンテキストに対して適用する作用(ここでは加算を行っている)
もっとScalaらしく書くと次のようなコードになる:
for( item <- l ) { total += item }
この時点でIteratorとはおさらばできたが、より関数型言語らしく書くこともできるだろう:
l.foreach{ item => total += item }
ここでforEach
関数が登場する。これは無名関数を受け取りリストを横断しながら処理を行う関数だ。この場合の無名関数はコンテキスト(ここではtotal
)に諸要素を足し合わせている。
さて、無名関数はScalaではファーストオブジェクトなので変数に格納すれば使い回して再利用することができる。
val l = List(1, 234, 455, 987) val l2 = List(134, 664, 987, 456) var total = 0 def step(item: Int) = total += item l foreach step total = 0 l2 foreach step
賢明なるあなたはきっとこう言うだろう。「クソみたいな設計だな。その関数は副作用があるし、変数を使っていて全くクソな設計だ。しかももう一度関数を呼ぶときにはtotal
を0に初期化してやらないといけないじゃないか!」と。
あなたの考えは全くもって正しい。まず副作用を持つ関数は非常に危険だ。関数実行に伴って関数外部の何かの状態を変更してしまうからだ。この「状態」は関数に対して排他的ではなく、しかも他のコードによって変更されるかもしれない。この状態の変更という危険は、潜在的に他のスレッドからの変更も含まれることになる。副作用を持つ関数は、クリーンで頑強な設計を保つという観点からはお奨めできるものではなく、Scalaのような関数型言語は、IO操作などのどうしても必要な場合という程度にまで、副作用を持つ関数を減らそうとしている。そして次に、ミュータブルな変数も同様に危険性を孕んでいる。もしあなたのコードがいくつかのスレッドで動作したとき、二つのスレッドが同じ変数の値を変更しようとしたら一体どちらが勝つのだろうか?上記の例ではシンクロナイゼーション、つまり変数を書き換える間他のスレッドをブロックする必要性が出てくる。このことは、Play2の存在意義のうちの一つである「ノンブロッキングなWebアプリケーション」という概念を破壊してしまうのだ。
コードをイミュータブルに、副作用を排除して書き直す
def foreach(l: List[Int]) = { def step(l: List[Int], total: Int): Int = { l match { case List() => total case List(elt) => total + elt case head :: tail => step(tail, total + head) } } step(l, 0) } foreach(l)
ちょっとマシになったことが分かるかな?少なくとも次の点について注目してほしい。
- イミュータブルな変数
var total
が消滅したこと。 step
関数は繰返しのそれぞれのステップで実行されるが、以前とは異なるものとなっている。以前はstep
は繰返しの状態をも管理し、次のような動作をしていた。- リストが空なら現在の
total
を返す - リスト要素の数が1つであれば、
total
とelt
を加算して返す - リストが2つ以上の要素を含んでいるなら、リストの
tail
と、total + head
という新たなtotal
を引数としてstep
を呼び出す
- リストが空なら現在の
繰返しのそれぞれの過程では、直前の繰返しの結果に依存しつつ、step
は2つの状態を選択することができる。
- まだ多くの要素があるので繰返しを継続する
- リストの末尾に到達した、もしくは要素が全く無いので繰返しを停止する
次の点にも注目してほしい。
step
は末尾再帰関数であり、(再帰の終わりにコールスタックを全て展開せず直ぐにリターンする) 末尾再帰関数はスタックオーバーフローを防ぎ、先に紹介したイテレータ付きのコードのように振る舞うstep
はリストの残りの要素と新たなtotal
を次のstep
に送り込むstep
は全く副作用を伴わずに合計値を返す
そう、このコードはリストの一部をそれぞれのstep
で(要素への参照を、ではあるが)コピーし直しているから、ややより多くのメモリを消費するが、副作用を持たないしイミュータブルなデータ構造のみを使っている。これによりコードは非常に頑強になり、何の問題も無しに分散可能に(distributable)なる。
また、Scalaのコレクションが提供する素晴らしいメソッドを利用することで、非常に短くコードを書くことができる:
l.foldLeft(0){ (total, elt) => total + elt }
ここまでのまとめ
この節では、ステップごとに伝播していくイミュータブルデータ構造をベースにして繰返し(iteration)を検討してみた。この観点からは、繰返しは次の要素を内包する概念である:
- 前のステップから情報を受け取る:コンテキストと状態(state)
- 現在/残りの要素(群)を取得する
- 新たな状態とコンテキストを残りの要素群から導く
- 新たな状態とコンテキストを次のステップに伝播する
Iterator, Iterateeへ
さあ、繰り返しについてはもうはっきり理解できたと思う。さあ、Iterateeへと戻ろう。
先程の繰返しのメカニズムを一般化して、こういう風に書きたい状況を想像してほしい。
def sumElements(...) = ... def prodElements(...) = ... def printElements(...) = ... l.iterate(sumElements) l.iterate(prodElements) l.iterate(printElements)
そう、ScalaのコレクションAPIを使えば、色々な事が出来るようになる。
最初の繰り返しと次の繰返しとを結合したい時を想像してほしい。
def groupElements(...) = ... def printElements(...) = ... l.iterate(groupElements).iterate(printElements)
この繰返しをコレクション以外の物に適用したい時を想像して欲しい。
- ファイルやネットワーク接続、データベース接続などの少しずつ生成されるデータの流れや
- アルゴリズムによって生成されるデータフローや
- スケジューラやアクターのような非同期にデータを生成する存在からのデータフロー
Iterateesはまさにこのような時のための存在なのだ。
試しに、先程の合計の繰返しをIterateeで書くとこうなる。
val enumerator = Enumerator(1, 234, 455, 987) enumerator.run(Iteratee.fold(0){ (total, elt) => total + elt }
Ok, これはあまり前のコードとそう変わらないし、大した事をしているようにも見えない。 でも私を信用してほしい。もっとすごいこともできる。 少なくとも、複雑には見えないでしょう?
でも見れば分かる通り、IterateeはEnumeratorと共に使われている。二つの概念は固く結び付いているのだ。
ではこれらの考え方に一歩一歩近づいていこう。
><> Enumeratorについて ><>
Enumerator
はコレクションや配列よりもさらに一般化された概念である
今まで、我々は繰返しの中でコレクションを使用してきた。しかし先に説明した通り、我々はもっと一般的な、単にデータの塊(チャンク)を即座にもしくは非同期に将来生成するものを繰返して処理できる。
Enumerator
はこの目的のために設計されている。
いくつかの簡単なEnumerator
の例を示そう:
// an enumerator of Strings val stringEnumerator: Enumerator[String] = Enumerate("alpha", "beta", "gamma") // an enumerator of Integers val integerEnumerator: Enumerator[Int] = Enumerate(123, 456, 789) // an enumerator of Doubles val doubleEnumerator: Enumerator[Double] = Enumerate(123.345, 456.543, 789.123) // an Enumerator from a file val fileEnumerator: Enumerator[Array[Byte]] = Enumerator.fromFile("myfile.txt") // an Enumerator generated by a callback // it generates a string containing current time every 500 milliseconds // notice (and forget for the time being) the Promise.timeout which allows non-blocking mechanism val dateGenerator: Enumerator[String] = Enumerator.generateM( play.api.libs.concurrent.Promise.timeout( Some("current time %s".format((new java.util.Date()))), 500 ) )
Enumerator
は静的に型付けされたデータの塊の生産者である。
Enumerator[E]
はE
型のデータの塊を生成し、次に挙げる3種類のうちのどれかになりうる:
Input[E]
はE
型のデータの塊である。例えばInput[Pizza]
はPizza
の塊である。Input.Empty
はenumeratorが空であることを示している。例えば、空のファイルをストリーミングしているEnumerator
である。Input.EOF
はenumeratorが終わりに到達したことを示している。例えば、ファイルをストリミングしてファイルの終わりに到達したEnumerator
である。
チャンクの種類と上で説明した状態(まだデータがある/ない/もう要素がない)とを比較することができる。
実際、Enumerator[E]
はInput[E]
を含んでいるためInput[E]
を中に入れることができる:
// create an enumerator containing one chunk of pizza val pizza = Pizza("napolitana") val enumerator: Enumerator[Pizza] = Enumerator.enumInput(Input.el(pizza)) // create an enumerator containing no pizza val enumerator: Enumerator[Pizza] = Enumerator.enumInput(Input.Empty)
Enumerator
はノンブロッキングな生産者である
Play2の裏を支える理念は、知ってるかもしれないが、完全にノンブロッキングで非同期であることだ。そのように、Enumerator
/Iteratee
はこの哲学を反映している。Enumerator
はチャンクを完全に非同期でノンブロッキングな方法で生成する。これはEnumerator
のコンセプトははじめからアクティブプロセスやデータのチャンクを生成するバックグラウンドタスクと結び付いていたわけではないことを意味している。
上に挙げたコードのdateGenerator
がまさしく非同期でノンブロッキングなEnumerator
/Iteratee
の性質を表わしているのを覚えていますか?
// an Enumerator generated by a callback // it generates a string containing current time every 500 milliseconds // notice the Promise.timeout which provide a non-blocking mechanism val dateGenerator: Enumerator[String] = Enumerator.generateM( play.api.libs.concurrent.Promise.timeout( Some("current time %s".format((new java.util.Date()))), 500 ) )
Promise
とは何か?
しゃんと説明するにはもう一つ記事が必要になってしまうのだが、とにかくPromise
という名前はそれがする事に対応していると言えるだろう。Promise[String]
が意味する所とは、《未来に向けてString
を提供する(か、エラーを返す)》ただそれだけだ。そして、Promise
は現行のスレッドをブロックせず、ただ流していく。
Enumerator
はその生成したデータの消費者を必要とする
ノンブロッキングであるというその性質のために、もし誰もその生成されたチャンクを消費しなければ、Enumertor
は何もブロックせず、そして何も隠れたランタイムリソースを消費しない。
つまり、Enumerator
がデータの塊を生成するのは、恐らくそれを消費する者が居るときだけだ。
さてEnumerator
が生成したデータの塊を消費するのは一体誰なのだろう?
そう、まさしくあなたはここでIteratee
の概念を導いたことになる。
><> Iteratee
について ><>
Iteratee
はEnumerator
を繰返して処理できる一般的な「何か」
Let’s be windy for one sentence:
Iteratee
は、繰返しの概念を純粋関数型プログラミング向けに一般化して置き換えたものだ。Iterator
がコレクションを繰返すために作られているのに対して、Iteratee
はEnumerator
が繰返されるのを待つ一般的な存在である。
Iterator
とIteratee
との違いが分かるだろうか? 分からなくても大丈夫だ。ただこれだけの事を覚えていれば、それでいい。
Iteratee
はEnumerator
(など)が生成したデータの塊を繰返し処理する一般的な存在であるIteratee
はそれが繰返し処理するEnumerator
とは独立して作られ、Enumerator
はそれに提供されるIteratee
はイミュータブル且つステートレスで、異なるEnumerator
へ完全に再利用可能である
これこそが、
Iteratee
はEnumeratorに適用されるか
Enumerator`を走査する と言える理由なのだ。
Enumerator[Int]
の全要素の合計を計算する先程の例を覚えているだろうか?
ここに示すのはIteratee
は一度作成されると異なるEnumerator
に対して複数回再利用できることを示す同じ内容のコードだ。
val iterator = Iteratee.fold(0){ (total, elt) => total + elt } val e1 = Enumerator(1, 234, 455, 987) val e2 = Enumerator(345, 123, 476, 187687) // we apply the iterator on the enumerator e1(iterator) // or e1.apply(iterator) e2(iterator) // we run the iterator over the enumerator to get a result val result1 = e1.run(iterator) // or e1 run iterator val result2 = e2.run(iterator)
Enumerator.apply
とEnumerator.run
はやや異なる関数だが、これについては後程触れることにする。
Iteratee
はデータの塊を能動的に消費する
標準では、Iteratee
は最初のデータの塊を待ち、その直後に繰返しの機構を開始する。Iteratee
は処理が完了したと判断するまでデータの消費を続ける。
一度初期化されれば、Iteratee
は完全な繰返しの過程に完全に応答できるようになり、停止する時を判断できる。
// creates the iteratee val iterator = Iteratee.fold(0){ (total, elt) => total + elt } // creates an enumerator val e = Enumerator(1, 234, 455, 987) // this injects the enumerator into the iteratee // = pushes the first chunk of data into the iteratee enumerator(iterator) // the iteratee then consumes as many chunks as it requires // don't bother about the result of this, we will explain later
上で説明されている通り、Enumerator
はデータの塊の生産者であり、消費者がデータの塊を消費することを期待している。
消費/繰返しされるためには、Enumerator
はIteratee
に注入/接続されなければならない。より正確に言うと、最初のデータの塊はIteratee
に注入/プッシュされなければならない。
その性質上Iteratee
はEnumerator
の生産速度に依存してしまう。もしEnumerator
が低速ならば、Iteratee
も低速になる。
Iteratee
とEnumerator
の関係は支配逆転(Inversion of control)や依存性注入(Dependency Injection)パターンに合わせたものであると捉えることもできるということを分かっていただきたい。
Iteratee
は「1-chunk-loop」関数である
Iteratee
はチャンクを、繰返しが終了したと判断するまで一つづつ消費する。
実際、Iteratee
の現実のスコープは一つのチャンクの処理に限られている。これがIteratee
が一つのデータの塊を消費できる関数として定義できる理由だ。
Iteratee
は静的に型付けされた塊を受け付け、静的に型付けされた結果を計算する
Iterator
が、それを生成したコレクションからやって来るデータの塊を繰返し処理するのに対し、Iteratee
は若干さらに意欲的だ。データの塊を消費する間、何かを計算することができるのだ。
この証左として、Itarator
のシグネチャは以下に示す通りである。
trait Iteratee[E, +A] // E is the type of data contained in chunks. So it can only be applied on a Enumerator[E] // A is the result of the iteration
最初の例示に戻ってみよう。Enumerator[Int]
が生成した全ての整数の合計を計算するものだ。
// creates the iteratee val iterator = Iteratee.fold(0){ (total, elt) => total + elt } val e = Enumerator(1, 234, 455, 987) // runs the iteratee over the enumerator and retrieve the result val total: Promise[Int] = enumerator run iterator
run
の使い方に注目してほしい。非同期の世界に我々は居るから、その結果は合計そのものではなく合計のPromise[Int]
が返っているのが分かると思う。 本当の合計を得るには、scalaの同時処理をブロックするAwait._
関数を使うことができる。しかしこれはブロッキングなAPIだから良くないやり方だ。Play2は完全に非同期でノンブロッキングだから、一番良いやり方はpromiseをPromise.map
/flatMap
を使って伝播させることだ。
しかし結果は強制的でない。例として、全ての消費されたチャンクをただprintln
してみよう。
// creates the iteratee val e = Enumerator(1, 234, 455, 987) e(Iteratee.foreach( println _ )) // or e.apply(Iteratee.foreach( println _ )) // yes here the usage of _ is so trivial that you shall use it
返ってくる結果はプリミティブな型でなくてもよく、以下の例のように全てのチャンクを連結したリストでもよい。
val enumerator = Enumerator(1, 234, 455, 987) val list: Promise[List[Int]] = enumerator run Iteratee.getChunks[Int]
Iteratee
は繰返しにかけてイミュータブルなコンテキストと状態を伝播できる
最終的な合計を得るには、Iteratee
は繰返しの過程で部分的な合計を伝播していかなければならない。
これはIterateeが直前のステップからコンテキスト(例では直前の合計)を受け取ることができ、新たなコンテキストを現在のデータの塊から計算でき(新たな合計 = 直前の合計 + 現在の要素)、最終的にこのコンテキストを次のステップ(次のステップに行く必要があるとき)に伝播させることができるという事を意味している。
単に、Iteratee
はステートマシンである
まあそれはそれでいいとして、どうやってIteratee
は繰り返しを停止すべきかを判断するのだろうか?
エラーが発生したりEOFが存在したり、Enumerator
の終端に到達したときに何が起こるのだろう?
それ故にコンテキストに加えて、Iteratee
は前の状態も受け取り、何をすべきかを考え、次のステップに移るための新たな状態を潜在的に計算する。
さあ、前述した従来の繰り返しでの「状態」を思い出してほしい。繰り返しの過程においてIteratee
がとりうる状態として、ほぼ同じ二つの状態が存在する。
Cont
状態 : 繰り返しが次のチャンクとともに継続可能で潜在的に新たなコンテキストを計算するDone
状態 : 処理の終端に到達したことを通知し、結果となるコンテキストの値を返す
そして3つめの状態ははっきり論理的に見える:
Error
状態 : 現在のステップにおいてエラーが発生したことを通知し繰り返しを中断する
この観点からは、
Itatetee
はDone
かError
という終着条件を検出するまでCont
状態を繰り返すことだけを任された単なるステートマシンだと捉えることができます。
Iteratee
がとる状態Done
/Error
/Cont
も同じくIteratee
である
ここで、Iteratee
は1チャンクごとにループする関数であることと、その主な目的はある状態を次の状態に遷移させることだということを確認してほしい。
そして、3つの【状態】のIteratee
が存在する。
Done[E, A](a: A, remaining: Input[E])
- 前のステップから受け取ったコンテキストを表わす
a:A
。 - 次のチャンクを表わす
remaining: Input[E]
。
Error[E](msg: String, input: Input[E])
- とても簡潔にこう理解することもできる: エラーメッセージとそれを失敗させた入力。
Cont[E, A](k: Input[E] => Iteratee[E, A])
これは、Input[E]
を取り他のIteratee[E, A]
を返す関数から構成されているが故に、最も複雑な【状態】である。Input[E] => Iteratee[E, A]
という関数が、一つの入力を消費しDone
かError
の状態に到達するまで〈他の入力を受け取り他の新たな状態/iterateeを返す〉新たな状態/iterateeを返す*1...ための簡潔なよい方法だということは、理論の深みにあまり近付かずとも簡単に分かると思う。
この構造は繰り返しの機構を(典型的な関数型のやり方で)確実に成り立たせることができる。
さて、これまでの解説はとても多くの情報に感じたはずだ。きっとあなたは、何故こんなに私が一から十まで説明するのか疑問に思っていることだろう。これは全て、このことが理解できればIteratee
を自在に作成する方法を理解できるようになるからなのだ。
例を示すためにEnumerator[Int]
の最初の二つの要素の合計を計算するIteratee
を書いてみよう。
// Defines the Iteratee[Int, Int] def total2Chunks: Iteratee[Int, Int] = { // `step` function is the consuming function receiving previous context (idx, total) and current chunk // context : (idx, total) idx is the index to count loops def step(idx: Int, total: Int)(i: Input[Int]): Iteratee[Int, Int] = i match { // chunk is EOF or Empty => simply stops iteration by triggering state Done with current total case Input.EOF | Input.Empty => Done(total, Input.EOF) // found one chunk case Input.El(e) => // if first or 2nd chunk, call `step` again by incrementing idx and computing new total if(idx < 2) Cont[Int, Int](i => step(idx+1, total + e)(i)) // if reached 2nd chunk, stop iterating else Done(total, Input.EOF) } // initiates iteration by initialize context and first state (Cont) and launching iteration (Cont[Int, Int](i => step(0, 0)(i))) } // Using it val promiseTotal = Enumerator(10, 20, 5) run total2Chunks promiseTotal.map(println _) => prints 30
受け取ったチャンクの型に依存して新たな
State
/Iteratee
を返すItaretee
を書くことは、それぞれのステップで何をすべきかを選択するのと大差ないのだと、この例で理解できるはずだ。
参考記事
Iteratee - Wikipedia, the free encyclopedia
このスライドを見ると良く理解できる。かも。
Introduction to Iteratees (Scala)