Scalaにはfs2という非同期ストリーミング処理を生産的に行えるライブラリがある。ストリーミング処理なので、沢山のデータがどんどん流れてきたり、ビッグデータ的な情報を扱う企業でよく利用されている。
最初から非同期処理に対応しているので、たくさんのデータを相手にしてもきちんとコアを使い切ったり、複雑なフローを表現できるのがウリだ。
先日このfs2を練習していて、フィボナッチ数列を出力するようなストリームって作れるんかな〜と思い色々調べていたところ、Pull
という概念を覚えることで複雑なストリームを構成できることを学んだのでメモ。
利用したScalaとfs2のバージョンは以下の通り:
//> using scala 3.5.0 //> using dep "co.fs2::fs2-core:3.11.0"
フィボナッチ数列を生成したい
なぁ霊夢 フィボナッチ数列って知ってるか
フィボナッチ数列・・・直前の2つの数を足すことを繰り返すことで得られる数列ね
唐突に始まるゆっくり解説・・・
まぁそれはいいとして、フィボナッチ数列の定義はきわめて素朴だ。
- 最初の2要素は
0
と1
- $fib(n) = fib(n - 1) + fib(n - 2)$
ただこれだけだ。しかしながらこれを素朴にfs2のStream
で表現するのは面倒だ。普通Stream
は流れてくる要素を順に処理していくのであって、過去の2要素については普通知らないからだ。フィボナッチ数列を出力するStream
を構成するには、Stream
自体がなんとかして状態を持つ必要がある。
状態を持つストリーム
ストリーミング処理がある程度複雑になってくると、状態を持つストリームが出現する。さきほどのフィボナッチ数列のように直前の2要素を参照する必要があったり、他にも入力に対してSHA-256を出力するようなストリームは一定量のデータをチャンクとして蓄積しなければダイジェストの計算ができない。Stream
にはscanChunksOpt
のように状態を持った蓄積処理を行うためのプリミティブが存在するが、状態を持った変形に対してはさほどパワフルではない。
状態を持った複雑な処理を行うにはどのような方法があるだろう?
Stream
の中身に迫る
Stream
がどのように要素を、つまりストリームを流れてくるInt
だのByte
だのを生成しているかについて復習しておこう。
まず、Stream
は、要素そのものではなく要素の代わりにChunk
を生成する。これは何故かというと、本当に要素が1つずつ生成されることは実務上は稀で、たいていは4キロバイトとか1000要素とかそういう単位でデータがやってくる。そのほうが経済的だからだし、ストレージは原理的に4キロバイトといったブロック単位でしかデータの入出力ができないことも多い(ブロックデバイス)。
次に、fs2のアーキテクチャはプル型なので、Stream
は要求されない限り要素を送信しない。下流が要素を要求したタイミングでStream
は順に要素を送り出していく。どのように要素を送り出しているのか?その答えはPull
にある。
プログラマブルな輪転機、Pull
Pull
の基礎
Pull
とは、Stream
が要素を要求した時の振舞いを記述するものだ。振舞いとは、「値を1つ出力」だったり「チャンクを出力」のこともあれば「もう値はないよ、完了です」のこともある。これらを表現するのがPull
である。
import fs2.Chunk import fs2.Pull Pull.output1(42) // 「42」を1つ出力する Pull.output(Chunk(0, 1)) // チャンクの形で0と1とを出力する Pull.done // 要素はないよ、と伝えてストリームを終了する
Pull
はモナドになり、一連の連なりとして記述され、頭から実行される。Pull
が1つだけの場合もあるし、複数のこともあるし、ループ状になっていることもある。
Pull.output1(0) >> Pull.output1(1) >> Pull.output1(2) // 0を出力し、1を、そして2を出力する def loop: Poll[IO, Int, Unit] = Pull.output1(6) >> loop // 6, 6, 6, 6, 6... を無限に出力し続ける // ↑ >>は遅延評価されるのでスタックオーバーフローは発生しない
再帰するときに引数を渡すようにすれば、Pullに状態を持たせることができる:
def loop2(curr: Int): Pull[IO, Int, Unit] = Pull.output1(curr) >> loop2(curr + 1) // loop2(0)で開始すると0, 1, 2, 3, 4... を無限に出力し続ける
Stream
が要素を要求したとき、Pull
が1ステップ実行される。実行結果とは別に、実行の副作用として要素がStream
へと出力される。このため、Pull
のシグネチャはPull[Effect, Emit, Result]
という3引数になっている。例えば、
Pull.output1(0)
IO
を使う場合、上掲のコードのシグネチャはPull[IO, Int, Unit]
だ。StreamにInt
を吐き出し、その結果としてUnit
を返すからだ。
実行に話を戻そう。これでStream
が要求した要素数を満たしているのであれば、Pull
はこれ以上実行されない。これがfs2が「プル型」である所以だ。
もしStream
が要求した要素数を返せていないのであれば、Stream
を満足するまでPull
が実行され続ける。Pull
が末端に到達するか、Pull.done
が出力されたとき、そのStream
も停止する。
こうして見てみるとPull
はIO
に似ているが、IO
は終了するまでどんどん次へと実行されていくのに対して、Stream
が要求したぶんしか実行されないという点において異なる。
val io = IO.println("foo") >> IO.println("bar") // 2つが連続で自動的に実行される val pull = Pull.output1("foo") >> Pull.output1("bar") // Streamが要求して初めて必要な分だけ実行される
Pull
はモナド
Pull
がモナドであるということは、for
式で組み合わせていいということだ。
val ioPull: Pull[IO, Int, Unit] = for { _ <- Pull.eval(IO(println("Hello, Pull!"))) n <- Pull.eval(IO(42)) _ <- Pull.output1(n) } yield ()
ここで、Pull.eval
はIO
を評価するだけの関数だ。IO
はそのままPull
では使えないので、型合わせをするための関数だと思っておけばよい。これを評価すると、42
を出力するだけのPull
ができる。
Pull
をStream
にするにはstream
メソッドを呼ぶだけでよい。
ioPull.stream // Stream[IO, Int]
ここまでの説明で、Pull
のイメージをある程度掴んでもらえたのではないかと思う。Pull
は、必要に応じて回転する輪転機のようなもので、IO
のように構成され、必要な数だけ1ステップずつ順に実行される。
フィボナッチ数列をPull
を使って構成する
さて、ここからは実戦だ。フィボナッチ数列を吐き出すようなStream
をPull
をうまく使って構成してみよう。
フィボナッチ数列の最初2要素は固定だから、まずここを実装する:
import fs2.Stream import cats.effect.{IO, IOApp} import fs2.Pull import fs2.Chunk val fib0 = Pull.output(Chunk(0, 1))
次に再帰部分を実装する。1つ前の値と今の値が決まれば、次の値が決まるから、引数としてprev: Int
とcurr: Int
を受け取って、Int
を吐き出すようなPull
を作ればよい:
def fibN(prev: Int, curr: Int): Pull[IO, Int, Unit] = ???
実装はとても簡単だ。オーバーフローをチェックして、値を出力し、再帰させるだけだ。
def fibN(prev: Int, curr: Int): Pull[IO, Int, Unit] = { val next = prev + curr if next < 0 then Pull.done // stop when overflow else Pull.output1(next) >> fibN(curr, next) }
そして、今しがた作った2つのPull
をくっつければ完成だ:
val fib = fib0 >> fibN(0, 1)
動作確認をしよう。fib.stream
を呼び出すことでStream
が得られるから、これを1要素ずつ表示させてみよう:
import cats.effect.unsafe.implicits.global
fib.stream.evalMap(IO.println).compile.drain.unsafeRunSync
0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 ...
やった〜。フィボナッチ数列を出力するStream
であるfib
を実装できた。
まとめ
この記事では、以下のことを確認した。
Stream
はPull
が実行されていくことで値を得ているPull
は必要になるまで実行されず、必要な数だけ実行されるPull
はモナドなので、>>
やflatMap
、for
式を利用して組み立てることができるPull
を再帰的に組み立てることができ、引数と組み合わせることで状態を持つことができるPull
は.stream
でStream
に変換できる