Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Cats EffectのHotswapの使い方について調べた / 無限に出てくるトイレットペーパーを実装する

Cats Effectの勉強をしてたらHotswapというコンポーネントがあって便利そうだったので紹介します。

Hotswap

cats.effect.std.Hotswap はCats Effectのコンポーネントで、その名の通りResourceをなめらかに交換することができます。

公式サイトにはひととおりの説明が書かれているのですがすこしわかりにくかったのでこの記事で説明してみます。

typelevel.org

Resourceを単体で使う欠点

Hotswapの説明の前にResourceの説明も軽くしておきます。

そもそもResourceとは

プログラムにおいて、取得したら必ず解放しなければならないリソースというものが存在します。そうするべき理由は様々ですが、例えばファイルハンドルやTCPコネクションといった要素がこれに該当します。開いたままにしておくと、リソースは枯渇してしまいます。

Cats Effectではこのようなリソースを安全に解放するために、Resourceという概念を使ってリソースの解放を自動化しています。 Resourceとは、取得操作と解放操作のペアを抽象化した概念です。リソースはResource.makeを使って定義でき、実際のリソースの利用時は.useを使います。 他の言語にも似たような機構がありますが、Cats Effectではcats.effect.IOの基盤の上に設計されているため、合成をやりやすくなっています。

// リソースを返す操作を定義する
val open = IO.println(s"Opening $path ...") *> IO.pure(ActualFile(path))
val close = (f: ActualFile) => IO.println(s"Closing ${f.path} ...")
def file(path: String) = Resource.make(open)(close)

// useすると、確保されたリソースがスコープに引数として渡ってくる
// 取得と解放は自動的に行われる
def run = file("/foo/bar").use { f =>
  IO.println(s"Using ${f.path}")
}

/*  実行結果
Opening /foo/bar ...
Using /foo/bar
Closing /foo/bar ...
*/

ResourceflatMapで複数合成することができ、同時に複数のリソースを使うことを表現できます。リソースの取得と解放はやはり自動的に行われます。

def run2 = {
  val fx = for {
    f1 <- file("/foo/bar")
    f2 <- file("/fizz/buzz")
    f3 <- file("/hoge/fuga")
  } yield (f1, f2, f3)

  fx.use { files =>
    IO.println(s"Using ${files._1.path}, ${files._2.path}, and ${files._3.path}")
  }
}

/* 実行結果
Opening /foo/bar ...
Opening /fizz/buzz ...
Opening /hoge/fuga ...
Using /foo/bar, /fizz/buzz, and /hoge/fuga
Closing /hoge/fuga ...
Closing /fizz/buzz ...
Closing /foo/bar ...
*/

Resourceにできないこと

しかしResourceにも欠点があります。それはResource.useのブロック中でその内容を交換できないことです。 リソースの交換とは、古いリソースを解放し、新たなリソースを確保することです。

例えば、Resourceの内容を交換する以下のようなシチュエーションが考えられそうです:

  • ログファイルが一定サイズになったら交換しなければならない
  • 一定時間経過したらコネクションを作成しなおさなければならない

このような操作をResource単体で表現できるでしょうか:

def run3 = file("/var/log/scalalog").use { logFile =>
  var writtenBytes = 0
  (
    for {
      _ <- write("message", logFile)
      _ <- IO {
        writtenBytes = writtenBytes + 7
      }
      _ <- if (writtenBytes > 100) {
        ???
      } else IO.unit
    } yield ()
  ).foreverM
}

ActualFilefile().useを経由しなければ入手できず、file()Resourceを返すため、直接Resourceの中身を交換することは困難です。スコープの中でさらにResourceを開くという手もありますが、すると古いResourceは解放されずにプログラムが終了するまで確保されたままになるので、メモリリークが発生してしまいます。もちろんResource.useのスコープから抜ければリソースを交換できますが、そうすると続きの処理を実行できません。合成しにくくなってしまいますね。

「ここで交換する」とだけ宣言して、後はそのまま続きの処理を書けるようになっていると設計しやすくなりそうです。

まとめると以下の通りです。

  • リソースを確保する処理と解放する処理とをペアにして使い易くしたものがResourceである
  • Resource.use { ... }すると、確保処理と解放処理が代行され、ブロックの中でのみリソースを操作できるようになる
  • ResourceflatMapで合成可能だが、ブロック内部でリソースを交換する能力は持たない

あたかも単一のResourceを使っているように見せかけつつ、内部でリソースを交換する方法は無いのでしょうか?

Hotswap登場

Hotswapは、Resourceにそのスコープ内で内容を交換する能力を付与します。しかも、交換時には自動的に古いResourceは解放されます。

どのような仕組みでHotswapがResourceの内容を交換するかというと、リソース交換用のハンドルのようなものをHotswapが提供するので、ハンドルの.swap()メソッドを呼び出すことで解放と交換が自動的に行われる仕組みになっています。

また、その性質上、HotswapのハンドルもまたResourceとして提供されます。

注意点として、参照透過性を守るために同じ変数が指しているリソースの内容は変更されず、新たなリソースの内容が新たな変数に束縛されます。

def write(message: String, file: ActualFile): IO[Unit] =
  IO.println(s"writing [$message] to ${file.path}")
def run = Hotswap[IO, ActualFile](file("/var/log/scalalog")).use {
  case (swapHandle, initialLogFile) =>
    for {
      _ <- write("hello", initialLogFile)
      swapped <- swapHandle.swap(file("/var/log/otherlog"))
      _ <- write("world", swapped)
    } yield ()
}

これを実行すると、まず1つ目のResourceが確保され、.swap()を呼び出したタイミングで2つめのResourceの確保と1つ目のResourceの解放が行われます。そして最後に2つ目のResourceの解放が行われます。

1つ目のResourceが解放された後にその中身にアクセスしてはいけないので、実際の環境ではRefを挟んで同じ名前で透過的に中身を切り替えるとよさそうです。

def run = Hotswap[IO, ActualFile](file("/var/log/scalalog")).use {
  case (swapHandle, initialLogFile) =>
    for {
      logFileRef <- Ref[IO].of(initialLogFile)
      swap <- IO.pure((newFileRes: Resource[IO, ActualFile]) =>
        for {
          newf <- swapHandle.swap(newFileRes)
          _ <- logFileRef.set(newf)
        } yield ())
      f <- logFileRef.get
      _ <- write("hello", f)
      _ <- swap(file("/var/log/anotherFile"))
      f2 <- logFileRef.get
      _ <- write("world", f2)
    } yield ()
}
Opening /var/log/scalalog ...
writing [hello] to /var/log/scalalog
Opening /var/log/otherlog ...
Closing /var/log/scalalog ...
writing [world] to /var/log/otherlog
Closing /var/log/otherlog ...

まとめると以下の通りです。

  • Hotswapの責務は古いResourceの解放処理を呼び出し、新たに渡されたResourceの確保処理を呼び出すこと
  • swap()すると、新たに渡されたResourceにより確保されたリソースが返される
  • Hotswap自体もResourceであり、Hotswap.useを使って呼び出す

応用: 途切れないトイレットペーパー

Hotswapのミニマルな例として、途切れることのないトイレットペーパーを実装してみます。

  • トイレットペーパーは使うたびに残量が減っていき、残量が0の状態で使うと例外が送出されます。
  • さらに、使い切ったら残った芯を捨てる必要があります。

まさに管理が必要なリソースですね。トイレットペーパーをラップして、残量が0になった瞬間に新たなトイレットペーパーが補充されるようなリソースを実装してみます。

ToiletPaperクラス

ToiletPaper クラスは以下のような実装となっています。

// トイレットペーパーのある一定の部分
case class PaperPiece(offset: Int) extends AnyVal

/** トイレットペーパー
  *
  * すべて使った後は適切に芯を捨てなければならない
  */
case class ToiletPaper(val initialLasting: Int = 30) {
  private var lasting_ = initialLasting
  def lasting = lasting_
  def dispose(): Unit = {
    println("Disposing toilet paper")
  }
  def pull(): PaperPiece = {
    if (lasting == 0) {
      // ペーパーが切れると大変なことになる
      throw new Exception("Toilet apocalypse!!!!!!!!!!")
    }
    val piece = PaperPiece(initialLasting - lasting)
    lasting_ = lasting_ - 1
    piece
  }
  def status: String = ("*" * lasting) ++ ("_" * (initialLasting - lasting))
}

見ての通り、ペーパー切れの状態でpull()を呼び出すと例外が送出されます。

toiletPaperResource()

トイレットペーパーは手動でdispose()を呼ばなければならないので、まずやるべきことは、使いやすいResource表現を作成することです:

val acquireToiletPaperIO =
  IO.println("\nAcquiring toilet paper") *> IO(ToiletPaper())

val disposeToiletPaperIO =  (old: ToiletPaper) => IO(old.dispose())

// 確保と解放のIO操作のペアを元にResourceを作成する
val toiletPaperResource: Resource[IO, ToiletPaper] =
  Resource.make(acquireToiletPaperIO)(disposeToiletPaperIO)

あくまでもtoiletPaperResourceResourceなので、実際にuse相当の処理を行わない限りリソース取得処理は走りません。「リソースの表現」を定義しているのだと言うと分かりやすいと思います。

toiletPaperResource.use { paper =>
  // ここでようやくIO.println("\nAcquiring toilet paper") *> IO(ToiletPaper())が走る
  ...
} // ここでold => IO(old.dispose()が走る

automatedPaper0 -- 型定義

さて、Hotswapを使って自動化されたリソースの交換を行います。 ちょっと仕様を整理します。

  • 内部的に必要なリソースは、最初から入れておくToiletPaperと、それを新たなペーパーに交換するためのHotswapさえあれば良さそう
  • ユーザに可能な操作は、PaperPieceを受け取る操作のみ

型で表すとこういった感じですね。

type HotswapPair = (Hotswap[IO, ToiletPaper], ToiletPaper)
type ResourceForPaperPiece = Resource[IO, () => IO[PaperPiece]]

この2つがあれば、ペーパーを取り出す処理が実装できそうに思えます。

val automatedPaper0: HotswapPair => ResourceForPaperPiece = ???

automatedPaper0 -- 実装

全体

いったん全体の形を示します。

val automatedPaper0: HotswapPair => ResourceForPaperPiece = {
  case (hotswap, initialPaper) =>
    Resource.eval {
      for {
        ...             // 内部的な値を定義して・・・
      } yield () => ??? // 0-引数関数を返す
    }                   // それをResource化する
}

Resource.evalResource.makeの解放処理を省略したバージョンです。リソース解放処理は内部でHotswapに一任しているため、今回は必要ありません。

内部的なペーパーの管理

paperHolder

Hotswapがやってくれるのは古いリソースの解放までなので、新たなリソースへと参照を切り替えるのは実装側の責務です。 Refを使って、透過的にリソースにアクセスできるようにしましょう。Refの初期値は既に渡っているinitialPaperです。

val automatedPaper0: HotswapPair => ResourceForPaperPiece = {
  case (hotswap, initialPaper) =>
    Resource.eval {
      for {
        // 内部的にペーパーの管理にRefを使う
        paperHolder <- Ref[IO].of(initialPaper)
      } yield () => ??? // この中ではpaperHolderを使って参照する
    }
}
swap

紙が切れた場合の操作を簡単にするために、以下の処理をまとめて定義しておきます:

  • hotswap.swap()を呼び出して古いリソースを解放し、新たなToiletPaperを得る
  • paperHolder.set()を呼び出して、新たなToiletPaperを向くように参照を切り替える

これをswapという名前で定義しておきましょう。

val automatedPaper0: HotswapPair => ResourceForPaperPiece = {
  case (hotswap, initialPaper) =>
    Resource.eval {
      for {
        paperHolder <- Ref[IO].of(initialPaper)
        // 新たなToiletPaperを調達してRefに入れる処理をワンセットで定義し、
        // 既に捨てたペーパーに対して操作することを防ぐ
        swap <- IO.pure { (newPaperResource: Resource[IO, ToiletPaper]) =>
          for {
            // 新たなペーパーの入手と古いペーパーの解放を行う
            newPaper <- hotswap.swap(newPaperResource)
            // Refにセットする
            _ <- paperHolder.set(newPaper)
          } yield () }
      } yield () => ??? // この中では`swap`を呼ぶだけでよい
    }
}
関数定義

最後に、ユーザへと返される0-引数関数を定義します。この関数でやるべきことは以下の通りです:

  • paperHolderを使って紙を取り出す
  • ステータスを表示する
  • 紙の残量が0になっていたら、swapを呼び出す

これらの操作は全てIOなので、forを使って組み立てます。

val automatedPaper0: HotswapPair => ResourceForPaperPiece = {
  case (hotswap, initialPaper) =>
    Resource.eval {
      for {
        paperHolder <- Ref[IO].of(initialPaper)
        swap <- IO.pure { (newPaperResource: Resource[IO, ToiletPaper]) =>
          for {
            newPaper <- hotswap.swap(newPaperResource)
            _ <- paperHolder.set(newPaper)
          } yield () }
      } yield () =>
      for {
        // 紙を取り出し、必要に応じてswapを呼び出し、PaperPieceを返す
        piece <- paperHolder.modify { p => (p, p.pull()) }
        roll <- paperHolder.get
        _ <- IO.print(s"\r${roll.status}")
        _ <- if (roll.lasting == 0) swap(toiletPaperResource) else IO.unit
      } yield piece
    }
}

やったー!automatedPaper0が完成したぞ!

automatedPaper

Hotswapが渡されたときに動く中身が完成したので、実際にHotswapResourceとを渡しましょう。といってもこれは簡単で、Hotswap.applyflatMapを組み合わせたら完成します:

val automatedPaper = Hotswap(toiletPaperResource) >>= automatedPaper0

実は最初からHotswap.applyを起点に考えることもできたのですが、それだと考えることが増えてしまうので、いったん中身を組み立ててから最後にapplyをポン付けすることにしました。

automatedPaperを使う

実際に使ってみましょう。

object Main extends IOApp.Simple {
  import scala.concurrent.duration._
  import scala.language.postfixOps
  def run = automatedPaper.use { pullPaper =>
    {
      for {
        _ <- pullPaper() // 紙を引くことだけができる
        r <- Random.scalaUtilRandom[IO]
        wait <- r.nextGaussian map (_.abs * 50)
        _ <- IO.sleep(wait milliseconds)
      } yield ()
    }.foreverM
  }
}

「ランダム時間待ってから紙を引く」を永遠に繰り返してもらいます。

Image from Gyazo

自動的に内部でリソースが交換され続けている様子が分かります。

まとめ

Hotswapを作成したい場合は、以下の順で考えていくと良さそうです。

  • ユーザからは何を隠蔽し、どのような操作を可能にしたいか考える
  • 可能な操作をResource.evalで返すようにする
  • たいていforを使うことになる。forの中で内部的な状態を定義しておき、yieldの中でロジックを定義する
  • Hotswap.applyflatMapでくっつける
  • flatMapuseの使い分け: flatMapは合成なので実際にリソースは確保されないが、useすると確保されるので、基本的にflatMapで組み立てていくことになる

参考文献

今回使ったGistを公開しています。

gist.github.com

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?