ScalaのPlay Frameworkでは非同期処理を扱う時にIteratee/Enumeratorが登場するが、先日自分でプログラミングしていてよく分からない概念だと感じたのでメモ代わりに調べた事を纏める。
Iterateeとは入力されたデータを順に処理するための抽象的な仕組みらしい。Iterateeは繰返しのそれぞれのステップで、「次の値」「データ入力が使えないこと」「繰返し処理が完了したこと」を示す3つの値をとる。
英語では、'employ'と'employee'のように、'-ee'と続く名詞は受動態を示しているので、「繰返し構造の抽象化」であるイテレータを能動受動を逆転させた「繰返される構造の抽象化」といったところらしい。EnumeratorとEnumerateeはIterateeと対を成す概念だ。
Enumeratorはストリームを生産するものであり、Enumerateeはストリームを消費するものである。 Enumerateeはイミュータブルで再利用でき、エラーや結果、計算を内包する概念である。
Iterator to Iteratee
この節はおよそ次の記事の劣化版翻訳であり、コードサンプルも拝借しています。 Understanding Play2 Iteratees for Normal Humans - Mandubian Blog
まずIterateeを理解するためにはIteratorの概念について復習する必要がある。JavaライクにIteratorを書くと次のようなコードになるだろう:
val l = List(1, 234, 455, 987) var total = 0 var it = l.iterator while( it.hasNext ) { total += it.next }
このような繰返し構造のコードを書くとき、プログラマはいくつかの点を意識しながらコードを書く事になる。
- 繰返して処理されるストリームの状態(空ではないか?エラーはないか?)
- 現在の状態と次の状態を繋ぐのに必要なコンテキスト(ここでは合計値を示す
total) - コンテキストに対して適用する作用(ここでは加算を行っている)
もっとScalaらしく書くと次のようなコードになる:
for( item <- l ) { total += item }
この時点でIteratorとはおさらばできたが、より関数型言語らしく書くこともできるだろう:
l.foreach{ item => total += item }
ここでforEach関数が登場する。これは無名関数を受け取りリストを横断しながら処理を行う関数だ。この場合の無名関数はコンテキスト(ここではtotal)に諸要素を足し合わせている。
さて、無名関数はScalaではファーストオブジェクトなので変数に格納すれば使い回して再利用することができる。
val l = List(1, 234, 455, 987) val l2 = List(134, 664, 987, 456) var total = 0 def step(item: Int) = total += item l foreach step total = 0 l2 foreach step
賢明なるあなたはきっとこう言うだろう。「クソみたいな設計だな。その関数は副作用があるし、変数を使っていて全くクソな設計だ。しかももう一度関数を呼ぶときにはtotalを0に初期化してやらないといけないじゃないか!」と。
あなたの考えは全くもって正しい。まず副作用を持つ関数は非常に危険だ。関数実行に伴って関数外部の何かの状態を変更してしまうからだ。この「状態」は関数に対して排他的ではなく、しかも他のコードによって変更されるかもしれない。この状態の変更という危険は、潜在的に他のスレッドからの変更も含まれることになる。副作用を持つ関数は、クリーンで頑強な設計を保つという観点からはお奨めできるものではなく、Scalaのような関数型言語は、IO操作などのどうしても必要な場合という程度にまで、副作用を持つ関数を減らそうとしている。そして次に、ミュータブルな変数も同様に危険性を孕んでいる。もしあなたのコードがいくつかのスレッドで動作したとき、二つのスレッドが同じ変数の値を変更しようとしたら一体どちらが勝つのだろうか?上記の例ではシンクロナイゼーション、つまり変数を書き換える間他のスレッドをブロックする必要性が出てくる。このことは、Play2の存在意義のうちの一つである「ノンブロッキングなWebアプリケーション」という概念を破壊してしまうのだ。
コードをイミュータブルに、副作用を排除して書き直す
def foreach(l: List[Int]) = { def step(l: List[Int], total: Int): Int = { l match { case List() => total case List(elt) => total + elt case head :: tail => step(tail, total + head) } } step(l, 0) } foreach(l)
ちょっとマシになったことが分かるかな?少なくとも次の点について注目してほしい。
- イミュータブルな変数
var totalが消滅したこと。 step関数は繰返しのそれぞれのステップで実行されるが、以前とは異なるものとなっている。以前はstepは繰返しの状態をも管理し、次のような動作をしていた。- リストが空なら現在の
totalを返す - リスト要素の数が1つであれば、
totalとeltを加算して返す - リストが2つ以上の要素を含んでいるなら、リストの
tailと、total + headという新たなtotalを引数としてstepを呼び出す
- リストが空なら現在の
繰返しのそれぞれの過程では、直前の繰返しの結果に依存しつつ、stepは2つの状態を選択することができる。
- まだ多くの要素があるので繰返しを継続する
- リストの末尾に到達した、もしくは要素が全く無いので繰返しを停止する
次の点にも注目してほしい。
stepは末尾再帰関数であり、(再帰の終わりにコールスタックを全て展開せず直ぐにリターンする) 末尾再帰関数はスタックオーバーフローを防ぎ、先に紹介したイテレータ付きのコードのように振る舞うstepはリストの残りの要素と新たなtotalを次のstepに送り込むstepは全く副作用を伴わずに合計値を返す
そう、このコードはリストの一部をそれぞれのstepで(要素への参照を、ではあるが)コピーし直しているから、ややより多くのメモリを消費するが、副作用を持たないしイミュータブルなデータ構造のみを使っている。これによりコードは非常に頑強になり、何の問題も無しに分散可能に(distributable)なる。
また、Scalaのコレクションが提供する素晴らしいメソッドを利用することで、非常に短くコードを書くことができる:
l.foldLeft(0){ (total, elt) => total + elt }
ここまでのまとめ
この節では、ステップごとに伝播していくイミュータブルデータ構造をベースにして繰返し(iteration)を検討してみた。この観点からは、繰返しは次の要素を内包する概念である:
- 前のステップから情報を受け取る:コンテキストと状態(state)
- 現在/残りの要素(群)を取得する
- 新たな状態とコンテキストを残りの要素群から導く
- 新たな状態とコンテキストを次のステップに伝播する
Iterator, Iterateeへ
さあ、繰り返しについてはもうはっきり理解できたと思う。さあ、Iterateeへと戻ろう。
先程の繰返しのメカニズムを一般化して、こういう風に書きたい状況を想像してほしい。
def sumElements(...) = ... def prodElements(...) = ... def printElements(...) = ... l.iterate(sumElements) l.iterate(prodElements) l.iterate(printElements)
そう、ScalaのコレクションAPIを使えば、色々な事が出来るようになる。
最初の繰り返しと次の繰返しとを結合したい時を想像してほしい。
def groupElements(...) = ... def printElements(...) = ... l.iterate(groupElements).iterate(printElements)
この繰返しをコレクション以外の物に適用したい時を想像して欲しい。
- ファイルやネットワーク接続、データベース接続などの少しずつ生成されるデータの流れや
- アルゴリズムによって生成されるデータフローや
- スケジューラやアクターのような非同期にデータを生成する存在からのデータフロー
Iterateesはまさにこのような時のための存在なのだ。
試しに、先程の合計の繰返しをIterateeで書くとこうなる。
val enumerator = Enumerator(1, 234, 455, 987) enumerator.run(Iteratee.fold(0){ (total, elt) => total + elt }
Ok, これはあまり前のコードとそう変わらないし、大した事をしているようにも見えない。 でも私を信用してほしい。もっとすごいこともできる。 少なくとも、複雑には見えないでしょう?
でも見れば分かる通り、IterateeはEnumeratorと共に使われている。二つの概念は固く結び付いているのだ。
ではこれらの考え方に一歩一歩近づいていこう。
><> Enumeratorについて ><>
Enumeratorはコレクションや配列よりもさらに一般化された概念である
今まで、我々は繰返しの中でコレクションを使用してきた。しかし先に説明した通り、我々はもっと一般的な、単にデータの塊(チャンク)を即座にもしくは非同期に将来生成するものを繰返して処理できる。
Enumeratorはこの目的のために設計されている。
いくつかの簡単なEnumeratorの例を示そう:
// an enumerator of Strings val stringEnumerator: Enumerator[String] = Enumerate("alpha", "beta", "gamma") // an enumerator of Integers val integerEnumerator: Enumerator[Int] = Enumerate(123, 456, 789) // an enumerator of Doubles val doubleEnumerator: Enumerator[Double] = Enumerate(123.345, 456.543, 789.123) // an Enumerator from a file val fileEnumerator: Enumerator[Array[Byte]] = Enumerator.fromFile("myfile.txt") // an Enumerator generated by a callback // it generates a string containing current time every 500 milliseconds // notice (and forget for the time being) the Promise.timeout which allows non-blocking mechanism val dateGenerator: Enumerator[String] = Enumerator.generateM( play.api.libs.concurrent.Promise.timeout( Some("current time %s".format((new java.util.Date()))), 500 ) )
Enumeratorは静的に型付けされたデータの塊の生産者である。
Enumerator[E]はE型のデータの塊を生成し、次に挙げる3種類のうちのどれかになりうる:
Input[E]はE型のデータの塊である。例えばInput[Pizza]はPizzaの塊である。Input.Emptyはenumeratorが空であることを示している。例えば、空のファイルをストリーミングしているEnumeratorである。Input.EOFはenumeratorが終わりに到達したことを示している。例えば、ファイルをストリミングしてファイルの終わりに到達したEnumeratorである。
チャンクの種類と上で説明した状態(まだデータがある/ない/もう要素がない)とを比較することができる。
実際、Enumerator[E]はInput[E]を含んでいるためInput[E]を中に入れることができる:
// create an enumerator containing one chunk of pizza val pizza = Pizza("napolitana") val enumerator: Enumerator[Pizza] = Enumerator.enumInput(Input.el(pizza)) // create an enumerator containing no pizza val enumerator: Enumerator[Pizza] = Enumerator.enumInput(Input.Empty)
Enumeratorはノンブロッキングな生産者である
Play2の裏を支える理念は、知ってるかもしれないが、完全にノンブロッキングで非同期であることだ。そのように、Enumerator/Iterateeはこの哲学を反映している。Enumeratorはチャンクを完全に非同期でノンブロッキングな方法で生成する。これはEnumeratorのコンセプトははじめからアクティブプロセスやデータのチャンクを生成するバックグラウンドタスクと結び付いていたわけではないことを意味している。
上に挙げたコードのdateGeneratorがまさしく非同期でノンブロッキングなEnumerator/Iterateeの性質を表わしているのを覚えていますか?
// an Enumerator generated by a callback // it generates a string containing current time every 500 milliseconds // notice the Promise.timeout which provide a non-blocking mechanism val dateGenerator: Enumerator[String] = Enumerator.generateM( play.api.libs.concurrent.Promise.timeout( Some("current time %s".format((new java.util.Date()))), 500 ) )
Promiseとは何か?
しゃんと説明するにはもう一つ記事が必要になってしまうのだが、とにかくPromiseという名前はそれがする事に対応していると言えるだろう。Promise[String]が意味する所とは、《未来に向けてStringを提供する(か、エラーを返す)》ただそれだけだ。そして、Promiseは現行のスレッドをブロックせず、ただ流していく。
Enumeratorはその生成したデータの消費者を必要とする
ノンブロッキングであるというその性質のために、もし誰もその生成されたチャンクを消費しなければ、Enumertorは何もブロックせず、そして何も隠れたランタイムリソースを消費しない。
つまり、Enumeratorがデータの塊を生成するのは、恐らくそれを消費する者が居るときだけだ。
さてEnumeratorが生成したデータの塊を消費するのは一体誰なのだろう?
そう、まさしくあなたはここでIterateeの概念を導いたことになる。
><> Iterateeについて ><>
IterateeはEnumeratorを繰返して処理できる一般的な「何か」
Let’s be windy for one sentence:
Iterateeは、繰返しの概念を純粋関数型プログラミング向けに一般化して置き換えたものだ。Iteratorがコレクションを繰返すために作られているのに対して、IterateeはEnumeratorが繰返されるのを待つ一般的な存在である。
IteratorとIterateeとの違いが分かるだろうか? 分からなくても大丈夫だ。ただこれだけの事を覚えていれば、それでいい。
IterateeはEnumerator(など)が生成したデータの塊を繰返し処理する一般的な存在であるIterateeはそれが繰返し処理するEnumeratorとは独立して作られ、Enumeratorはそれに提供されるIterateeはイミュータブル且つステートレスで、異なるEnumeratorへ完全に再利用可能である
これこそが、
IterateeはEnumeratorに適用されるかEnumerator`を走査する と言える理由なのだ。
Enumerator[Int]の全要素の合計を計算する先程の例を覚えているだろうか?
ここに示すのはIterateeは一度作成されると異なるEnumeratorに対して複数回再利用できることを示す同じ内容のコードだ。
val iterator = Iteratee.fold(0){ (total, elt) => total + elt } val e1 = Enumerator(1, 234, 455, 987) val e2 = Enumerator(345, 123, 476, 187687) // we apply the iterator on the enumerator e1(iterator) // or e1.apply(iterator) e2(iterator) // we run the iterator over the enumerator to get a result val result1 = e1.run(iterator) // or e1 run iterator val result2 = e2.run(iterator)
Enumerator.applyとEnumerator.runはやや異なる関数だが、これについては後程触れることにする。
Iterateeはデータの塊を能動的に消費する
標準では、Iterateeは最初のデータの塊を待ち、その直後に繰返しの機構を開始する。Iterateeは処理が完了したと判断するまでデータの消費を続ける。
一度初期化されれば、Iterateeは完全な繰返しの過程に完全に応答できるようになり、停止する時を判断できる。
// creates the iteratee val iterator = Iteratee.fold(0){ (total, elt) => total + elt } // creates an enumerator val e = Enumerator(1, 234, 455, 987) // this injects the enumerator into the iteratee // = pushes the first chunk of data into the iteratee enumerator(iterator) // the iteratee then consumes as many chunks as it requires // don't bother about the result of this, we will explain later
上で説明されている通り、Enumeratorはデータの塊の生産者であり、消費者がデータの塊を消費することを期待している。
消費/繰返しされるためには、EnumeratorはIterateeに注入/接続されなければならない。より正確に言うと、最初のデータの塊はIterateeに注入/プッシュされなければならない。
その性質上IterateeはEnumeratorの生産速度に依存してしまう。もしEnumeratorが低速ならば、Iterateeも低速になる。
IterateeとEnumeratorの関係は支配逆転(Inversion of control)や依存性注入(Dependency Injection)パターンに合わせたものであると捉えることもできるということを分かっていただきたい。
Iterateeは「1-chunk-loop」関数である
Iterateeはチャンクを、繰返しが終了したと判断するまで一つづつ消費する。
実際、Iterateeの現実のスコープは一つのチャンクの処理に限られている。これがIterateeが一つのデータの塊を消費できる関数として定義できる理由だ。
Iterateeは静的に型付けされた塊を受け付け、静的に型付けされた結果を計算する
Iteratorが、それを生成したコレクションからやって来るデータの塊を繰返し処理するのに対し、Iterateeは若干さらに意欲的だ。データの塊を消費する間、何かを計算することができるのだ。
この証左として、Itaratorのシグネチャは以下に示す通りである。
trait Iteratee[E, +A] // E is the type of data contained in chunks. So it can only be applied on a Enumerator[E] // A is the result of the iteration
最初の例示に戻ってみよう。Enumerator[Int]が生成した全ての整数の合計を計算するものだ。
// creates the iteratee val iterator = Iteratee.fold(0){ (total, elt) => total + elt } val e = Enumerator(1, 234, 455, 987) // runs the iteratee over the enumerator and retrieve the result val total: Promise[Int] = enumerator run iterator
runの使い方に注目してほしい。非同期の世界に我々は居るから、その結果は合計そのものではなく合計のPromise[Int]が返っているのが分かると思う。 本当の合計を得るには、scalaの同時処理をブロックするAwait._関数を使うことができる。しかしこれはブロッキングなAPIだから良くないやり方だ。Play2は完全に非同期でノンブロッキングだから、一番良いやり方はpromiseをPromise.map/flatMapを使って伝播させることだ。
しかし結果は強制的でない。例として、全ての消費されたチャンクをただprintlnしてみよう。
// creates the iteratee val e = Enumerator(1, 234, 455, 987) e(Iteratee.foreach( println _ )) // or e.apply(Iteratee.foreach( println _ )) // yes here the usage of _ is so trivial that you shall use it
返ってくる結果はプリミティブな型でなくてもよく、以下の例のように全てのチャンクを連結したリストでもよい。
val enumerator = Enumerator(1, 234, 455, 987) val list: Promise[List[Int]] = enumerator run Iteratee.getChunks[Int]
Iterateeは繰返しにかけてイミュータブルなコンテキストと状態を伝播できる
最終的な合計を得るには、Iterateeは繰返しの過程で部分的な合計を伝播していかなければならない。
これはIterateeが直前のステップからコンテキスト(例では直前の合計)を受け取ることができ、新たなコンテキストを現在のデータの塊から計算でき(新たな合計 = 直前の合計 + 現在の要素)、最終的にこのコンテキストを次のステップ(次のステップに行く必要があるとき)に伝播させることができるという事を意味している。
単に、Iterateeはステートマシンである
まあそれはそれでいいとして、どうやってIterateeは繰り返しを停止すべきかを判断するのだろうか?
エラーが発生したりEOFが存在したり、Enumeratorの終端に到達したときに何が起こるのだろう?
それ故にコンテキストに加えて、Iterateeは前の状態も受け取り、何をすべきかを考え、次のステップに移るための新たな状態を潜在的に計算する。
さあ、前述した従来の繰り返しでの「状態」を思い出してほしい。繰り返しの過程においてIterateeがとりうる状態として、ほぼ同じ二つの状態が存在する。
Cont状態 : 繰り返しが次のチャンクとともに継続可能で潜在的に新たなコンテキストを計算するDone状態 : 処理の終端に到達したことを通知し、結果となるコンテキストの値を返す
そして3つめの状態ははっきり論理的に見える:
Error状態 : 現在のステップにおいてエラーが発生したことを通知し繰り返しを中断する
この観点からは、
ItateteeはDoneかErrorという終着条件を検出するまでCont状態を繰り返すことだけを任された単なるステートマシンだと捉えることができます。
Iterateeがとる状態Done/Error/Contも同じくIterateeである
ここで、Iterateeは1チャンクごとにループする関数であることと、その主な目的はある状態を次の状態に遷移させることだということを確認してほしい。
そして、3つの【状態】のIterateeが存在する。
Done[E, A](a: A, remaining: Input[E])
- 前のステップから受け取ったコンテキストを表わす
a:A。 - 次のチャンクを表わす
remaining: Input[E]。
Error[E](msg: String, input: Input[E])
- とても簡潔にこう理解することもできる: エラーメッセージとそれを失敗させた入力。
Cont[E, A](k: Input[E] => Iteratee[E, A])
これは、Input[E]を取り他のIteratee[E, A]を返す関数から構成されているが故に、最も複雑な【状態】である。Input[E] => Iteratee[E, A]という関数が、一つの入力を消費しDoneかErrorの状態に到達するまで〈他の入力を受け取り他の新たな状態/iterateeを返す〉新たな状態/iterateeを返す*1...ための簡潔なよい方法だということは、理論の深みにあまり近付かずとも簡単に分かると思う。
この構造は繰り返しの機構を(典型的な関数型のやり方で)確実に成り立たせることができる。
さて、これまでの解説はとても多くの情報に感じたはずだ。きっとあなたは、何故こんなに私が一から十まで説明するのか疑問に思っていることだろう。これは全て、このことが理解できればIterateeを自在に作成する方法を理解できるようになるからなのだ。
例を示すためにEnumerator[Int]の最初の二つの要素の合計を計算するIterateeを書いてみよう。
// Defines the Iteratee[Int, Int] def total2Chunks: Iteratee[Int, Int] = { // `step` function is the consuming function receiving previous context (idx, total) and current chunk // context : (idx, total) idx is the index to count loops def step(idx: Int, total: Int)(i: Input[Int]): Iteratee[Int, Int] = i match { // chunk is EOF or Empty => simply stops iteration by triggering state Done with current total case Input.EOF | Input.Empty => Done(total, Input.EOF) // found one chunk case Input.El(e) => // if first or 2nd chunk, call `step` again by incrementing idx and computing new total if(idx < 2) Cont[Int, Int](i => step(idx+1, total + e)(i)) // if reached 2nd chunk, stop iterating else Done(total, Input.EOF) } // initiates iteration by initialize context and first state (Cont) and launching iteration (Cont[Int, Int](i => step(0, 0)(i))) } // Using it val promiseTotal = Enumerator(10, 20, 5) run total2Chunks promiseTotal.map(println _) => prints 30
受け取ったチャンクの型に依存して新たな
State/Iterateeを返すItareteeを書くことは、それぞれのステップで何をすべきかを選択するのと大差ないのだと、この例で理解できるはずだ。
参考記事
Iteratee - Wikipedia, the free encyclopedia
このスライドを見ると良く理解できる。かも。
Introduction to Iteratees (Scala)