巨大テキストファイルが--------\nとかで区切られていて、その単位で変形して使いたいときに使える技。クソデカといっても今回は10MiBくらいでした。ぜんぜんデカくなくてごめんね。
fs2のバージョンは3.9.2。
いきなり解を示すと、.repartition(s => fs2.Chunk.array(s.split(EntrySplitter)))みたいなのを書けばよいです:
//> using scala 3.3.0 //> using dep "co.fs2::fs2-core:3.9.2" //> using dep "co.fs2::fs2-io:3.9.2" import cats.effect.ExitCode import cats.effect.IOApp import cats.effect.IO import fs2.text import fs2.io.file.{Files, Path} val EntrySplitter = "--------\n" type Entry = ??? object Main extends IOApp { def run(args: List[String]) = Files[IO] .readAll(Path(args(0))) .through(text.utf8Decode) .repartition(s => fs2.Chunk.array(s.split(EntrySplitter))) .map(parseEntry) .map { case None => fs2.Chunk.empty case Some(value) => fs2.Chunk(value) } .flatMap(fs2.Stream.chunk) // あとは煮るなり焼くなり .compile .drain >> IO.pure( ExitCode.Success ) val parseEntry: String => Option[Entry] = ??? } Main.main(args)
ミソ
- readAll
した時点ではfs2くんはバッファいっぱい、たとえば現行バージョンだと64 * 1024`バイトを1チャンクとして読んでくる仕様 .repartition(def repartition[O2 >: O](f: O2 => Chunk[O2])(implicit S: Semigroup[O2]): Stream[F, O2])を呼び出すとチャンクの単位を修正できる- やってくる単位は
Stringなので、これを.splitしてChunkに詰め直してやればよい - splitした結果から
Chunkを作るにはChunk.arrayに渡すだけでよい。
- やってくる単位は
- 詰め替えた結果としてチャンク単位の処理になるので、そのまま
mapしてパース処理をしてやればよい- パースが失敗するかもしれないので
Option[Entry]みたいな型で返してやる - さらに
mapしてChunkにしてやることでSomeの結果だけ抜き出せる(collectSomeみたいなやつないのかな?)
- パースが失敗するかもしれないので
- 最後の
.flatMap(fs2.Stream.chunk)はイディオムで、チャンクをflattenして普通のストリームに戻している- これでパース結果のStreamになる