Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Scalaのfs2でクソデカ文字列ストリームを特定のデリミタで分割して処理する

巨大テキストファイルが--------\nとかで区切られていて、その単位で変形して使いたいときに使える技。クソデカといっても今回は10MiBくらいでした。ぜんぜんデカくなくてごめんね。

fs2のバージョンは3.9.2

いきなり解を示すと、.repartition(s => fs2.Chunk.array(s.split(EntrySplitter)))みたいなのを書けばよいです:

//> using scala 3.3.0
//> using dep "co.fs2::fs2-core:3.9.2"
//> using dep "co.fs2::fs2-io:3.9.2"

import cats.effect.ExitCode
import cats.effect.IOApp
import cats.effect.IO
import fs2.text
import fs2.io.file.{Files, Path}

val EntrySplitter = "--------\n"

type Entry = ???

object Main extends IOApp {
  def run(args: List[String]) =
    Files[IO]
      .readAll(Path(args(0)))
      .through(text.utf8Decode)
      .repartition(s => fs2.Chunk.array(s.split(EntrySplitter)))
      .map(parseEntry)
      .map {
        case None        => fs2.Chunk.empty
        case Some(value) => fs2.Chunk(value)
      }
      .flatMap(fs2.Stream.chunk)
      // あとは煮るなり焼くなり
      .compile
      .drain >> IO.pure(
      ExitCode.Success
    )

  val parseEntry: String => Option[Entry] = ???
}

Main.main(args)

ミソ

  • readAllした時点ではfs2くんはバッファいっぱい、たとえば現行バージョンだと64 * 1024`バイトを1チャンクとして読んでくる仕様
  • .repartitiondef repartition[O2 >: O](f: O2 => Chunk[O2])(implicit S: Semigroup[O2]): Stream[F, O2])を呼び出すとチャンクの単位を修正できる
    • やってくる単位はStringなので、これを.splitしてChunkに詰め直してやればよい
    • splitした結果からChunkを作るにはChunk.arrayに渡すだけでよい。
  • 詰め替えた結果としてチャンク単位の処理になるので、そのままmapしてパース処理をしてやればよい
    • パースが失敗するかもしれないのでOption[Entry]みたいな型で返してやる
    • さらにmapしてChunkにしてやることでSomeの結果だけ抜き出せる(collectSomeみたいなやつないのかな?)
  • 最後の.flatMap(fs2.Stream.chunk)はイディオムで、チャンクをflattenして普通のストリームに戻している
    • これでパース結果のStreamになる
★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?