Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Scalaのfs2でテクニカルなStreamを作りたいときはPullを使うといい

Scalaにはfs2という非同期ストリーミング処理を生産的に行えるライブラリがある。ストリーミング処理なので、沢山のデータがどんどん流れてきたり、ビッグデータ的な情報を扱う企業でよく利用されている。

fs2.io

speakerdeck.com

最初から非同期処理に対応しているので、たくさんのデータを相手にしてもきちんとコアを使い切ったり、複雑なフローを表現できるのがウリだ。

先日このfs2を練習していて、フィボナッチ数列を出力するようなストリームって作れるんかな〜と思い色々調べていたところ、Pullという概念を覚えることで複雑なストリームを構成できることを学んだのでメモ。

利用したScalaとfs2のバージョンは以下の通り:

//> using scala 3.5.0
//> using dep "co.fs2::fs2-core:3.11.0"

フィボナッチ数列を生成したい

なぁ霊夢 フィボナッチ数列って知ってるか

フィボナッチ数列・・・直前の2つの数を足すことを繰り返すことで得られる数列ね

唐突に始まるゆっくり解説・・・

まぁそれはいいとして、フィボナッチ数列の定義はきわめて素朴だ。

  • 最初の2要素は01
  • $fib(n) = fib(n - 1) + fib(n - 2)$

ただこれだけだ。しかしながらこれを素朴にfs2のStreamで表現するのは面倒だ。普通Streamは流れてくる要素を順に処理していくのであって、過去の2要素については普通知らないからだ。フィボナッチ数列を出力するStreamを構成するには、Stream自体がなんとかして状態を持つ必要がある。

状態を持つストリーム

ストリーミング処理がある程度複雑になってくると、状態を持つストリームが出現する。さきほどのフィボナッチ数列のように直前の2要素を参照する必要があったり、他にも入力に対してSHA-256を出力するようなストリームは一定量のデータをチャンクとして蓄積しなければダイジェストの計算ができない。StreamにはscanChunksOptのように状態を持った蓄積処理を行うためのプリミティブが存在するが、状態を持った変形に対してはさほどパワフルではない。

状態を持った複雑な処理を行うにはどのような方法があるだろう?

Streamの中身に迫る

Streamがどのように要素を、つまりストリームを流れてくるIntだのByteだのを生成しているかについて復習しておこう。

まず、Streamは、要素そのものではなく要素の代わりにChunkを生成する。これは何故かというと、本当に要素が1つずつ生成されることは実務上は稀で、たいていは4キロバイトとか1000要素とかそういう単位でデータがやってくる。そのほうが経済的だからだし、ストレージは原理的に4キロバイトといったブロック単位でしかデータの入出力ができないことも多い(ブロックデバイス)。

次に、fs2のアーキテクチャはプル型なので、Streamは要求されない限り要素を送信しない。下流が要素を要求したタイミングでStreamは順に要素を送り出していく。どのように要素を送り出しているのか?その答えはPullにある。

プログラマブルな輪転機、Pull

3つのPullから値を取り出すイメージ

Pullの基礎

Pullとは、Streamが要素を要求した時の振舞いを記述するものだ。振舞いとは、「値を1つ出力」だったり「チャンクを出力」のこともあれば「もう値はないよ、完了です」のこともある。これらを表現するのがPullである。

import fs2.Chunk
import fs2.Pull

Pull.output1(42) // 「42」を1つ出力する

Pull.output(Chunk(0, 1)) // チャンクの形で0と1とを出力する

Pull.done // 要素はないよ、と伝えてストリームを終了する

Pullはモナドになり、一連の連なりとして記述され、頭から実行される。Pullが1つだけの場合もあるし、複数のこともあるし、ループ状になっていることもある。

Pull.output1(0) >> Pull.output1(1) >> Pull.output1(2) // 0を出力し、1を、そして2を出力する

def loop: Poll[IO, Int, Unit] = Pull.output1(6) >> loop // 6, 6, 6, 6, 6... を無限に出力し続ける
// ↑ >>は遅延評価されるのでスタックオーバーフローは発生しない

再帰するときに引数を渡すようにすれば、Pullに状態を持たせることができる:

def loop2(curr: Int): Pull[IO, Int, Unit] =
      Pull.output1(curr) >> loop2(curr + 1)
// loop2(0)で開始すると0, 1, 2, 3, 4... を無限に出力し続ける

Streamが要素を要求したとき、Pullが1ステップ実行される。実行結果とは別に、実行の副作用として要素がStreamへと出力される。このため、PullのシグネチャはPull[Effect, Emit, Result]という3引数になっている。例えば、

Pull.output1(0)

IOを使う場合、上掲のコードのシグネチャはPull[IO, Int, Unit]だ。StreamにIntを吐き出し、その結果としてUnitを返すからだ。

実行に話を戻そう。これでStreamが要求した要素数を満たしているのであれば、Pullはこれ以上実行されない。これがfs2が「プル型」である所以だ。 もしStreamが要求した要素数を返せていないのであれば、Streamを満足するまでPullが実行され続ける。Pullが末端に到達するか、Pull.doneが出力されたとき、そのStreamも停止する。

こうして見てみるとPullIOに似ているが、IOは終了するまでどんどん次へと実行されていくのに対して、Streamが要求したぶんしか実行されないという点において異なる。

val io = IO.println("foo") >> IO.println("bar") // 2つが連続で自動的に実行される

val pull = Pull.output1("foo") >> Pull.output1("bar") // Streamが要求して初めて必要な分だけ実行される

Pullはモナド

Pullがモナドであるということは、for式で組み合わせていいということだ。

val ioPull: Pull[IO, Int, Unit] = for {
  _ <- Pull.eval(IO(println("Hello, Pull!")))
  n <- Pull.eval(IO(42))
  _ <- Pull.output1(n)
} yield ()

ここで、Pull.evalIOを評価するだけの関数だ。IOはそのままPullでは使えないので、型合わせをするための関数だと思っておけばよい。これを評価すると、42を出力するだけのPullができる。

PullStreamにするにはstreamメソッドを呼ぶだけでよい。

ioPull.stream // Stream[IO, Int]

ここまでの説明で、Pullのイメージをある程度掴んでもらえたのではないかと思う。Pullは、必要に応じて回転する輪転機のようなもので、IOのように構成され、必要な数だけ1ステップずつ順に実行される。

フィボナッチ数列をPullを使って構成する

さて、ここからは実戦だ。フィボナッチ数列を吐き出すようなStreamPullをうまく使って構成してみよう。

フィボナッチ数列の最初2要素は固定だから、まずここを実装する:

import fs2.Stream
import cats.effect.{IO, IOApp}
import fs2.Pull
import fs2.Chunk

val fib0 = Pull.output(Chunk(0, 1))

次に再帰部分を実装する。1つ前の値と今の値が決まれば、次の値が決まるから、引数としてprev: Intcurr: Intを受け取って、Intを吐き出すようなPullを作ればよい:

def fibN(prev: Int, curr: Int): Pull[IO, Int, Unit] = ???

実装はとても簡単だ。オーバーフローをチェックして、値を出力し、再帰させるだけだ。

def fibN(prev: Int, curr: Int): Pull[IO, Int, Unit] = {
  val next = prev + curr
  if next < 0 then Pull.done // stop when overflow
  else Pull.output1(next) >> fibN(curr, next)
}

そして、今しがた作った2つのPullをくっつければ完成だ:

val fib = fib0 >> fibN(0, 1)

動作確認をしよう。fib.streamを呼び出すことでStreamが得られるから、これを1要素ずつ表示させてみよう:

import cats.effect.unsafe.implicits.global

fib.stream.evalMap(IO.println).compile.drain.unsafeRunSync
0
1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
...

やった〜。フィボナッチ数列を出力するStreamであるfibを実装できた。

まとめ

この記事では、以下のことを確認した。

  • StreamPullが実行されていくことで値を得ている
  • Pullは必要になるまで実行されず、必要な数だけ実行される
  • Pullはモナドなので、>>flatMapfor式を利用して組み立てることができる
  • Pullを再帰的に組み立てることができ、引数と組み合わせることで状態を持つことができる
  • Pull.streamStreamに変換できる

参考文献

blog.rockthejvm.com

devon-miller.gitbook.io

mpilquist.github.io

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?