巨大テキストファイルが--------\n
とかで区切られていて、その単位で変形して使いたいときに使える技。クソデカといっても今回は10MiBくらいでした。ぜんぜんデカくなくてごめんね。
fs2のバージョンは3.9.2
。
いきなり解を示すと、.repartition(s => fs2.Chunk.array(s.split(EntrySplitter)))
みたいなのを書けばよいです:
//> using scala 3.3.0 //> using dep "co.fs2::fs2-core:3.9.2" //> using dep "co.fs2::fs2-io:3.9.2" import cats.effect.ExitCode import cats.effect.IOApp import cats.effect.IO import fs2.text import fs2.io.file.{Files, Path} val EntrySplitter = "--------\n" type Entry = ??? object Main extends IOApp { def run(args: List[String]) = Files[IO] .readAll(Path(args(0))) .through(text.utf8Decode) .repartition(s => fs2.Chunk.array(s.split(EntrySplitter))) .map(parseEntry) .map { case None => fs2.Chunk.empty case Some(value) => fs2.Chunk(value) } .flatMap(fs2.Stream.chunk) // あとは煮るなり焼くなり .compile .drain >> IO.pure( ExitCode.Success ) val parseEntry: String => Option[Entry] = ??? } Main.main(args)
ミソ
- readAll
した時点ではfs2くんはバッファいっぱい、たとえば現行バージョンだと
64 * 1024`バイトを1チャンクとして読んでくる仕様 .repartition
(def repartition[O2 >: O](f: O2 => Chunk[O2])(implicit S: Semigroup[O2]): Stream[F, O2]
)を呼び出すとチャンクの単位を修正できる- やってくる単位は
String
なので、これを.split
してChunk
に詰め直してやればよい - splitした結果から
Chunk
を作るにはChunk.array
に渡すだけでよい。
- やってくる単位は
- 詰め替えた結果としてチャンク単位の処理になるので、そのまま
map
してパース処理をしてやればよい- パースが失敗するかもしれないので
Option[Entry]
みたいな型で返してやる - さらに
map
してChunk
にしてやることでSome
の結果だけ抜き出せる(collectSome
みたいなやつないのかな?)
- パースが失敗するかもしれないので
- 最後の
.flatMap(fs2.Stream.chunk)
はイディオムで、チャンクをflattenして普通のストリームに戻している- これでパース結果のStreamになる