Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

ScalaにGoroutineがやってくる!非同期処理ライブラリOxで遊んだ

Channelスタイルの並行処理の記述を(もちろん型安全に)可能にするライブラリOxについて調べて試してみた。結論から言うと書き味がめちゃくちゃ良くて面白い。

ソースコードも置いておく。

github.com

Ox

Oxとは、sttpなどの開発でお馴染のSoftwareMillによって開発されているScala用の非同期ライブラリである。まだ非常に若く、活発に開発されている。

github.com

Oxの特徴は、というか目的といっても差し支えないのだが、それはChannel指向の非同期処理、つまりGoroutineをScalaの上で実現している点だ。Goユーザならすぐに理解できるだろう。

百聞は一見に如かず。こんな感じのコードを書くことができる(v0.0.25時点)。

import ox.*
import ox.channels.*

import scala.concurrent.duration.*

def channels =
  // チャンネルを作成。rendezvousチャンネルはGoroutineでいうところの「バッファ無しチャンネル」のこと。
  // もちろんバッファ付きチャンネルもある(割愛)。
  val chan1 = Channel.rendezvous[String]
  val chan2 = Channel.rendezvous[String]

  // supervisedで囲っておくことでthrow時にgraceful shutdownが行われる
  supervised {

    // forkUserは要するにgo。この中は別のフォーク(Ox用語。軽量スレッド)で実行される。
    // supervisorは、forkUserで生成されたフォークがすべて完了するまで待機する
    forkUser {

      // chan1からメッセージを取り出して消化していく。処理に1秒かかるという想定
      chan1.foreach { s =>
        sleep(1.second)
        println(s)
      }
    }

    // 同様に別のフォークを起動する。こちらはchan2からStringを受け取り、大文字にして印字する
    forkUser {
      chan2.map(_.toUpperCase).foreach { s =>
        sleep(1.second)
        println(s)
      }
    }

    forkUser {
      // chan1とchan2のうち、先に受け付けたほうに文字列を送る
      select(chan1.sendClause("Hello"), chan2.sendClause("Hello"))

      // 0.5秒待つ
      sleep(0.5.second)

      // この時点で先に受信したチャンネルはまだブロックしている。
      // 再度文字列を送ると、空いているほうに送られる
      select(chan1.sendClause("World"), chan2.sendClause("World"))
      sleep(0.5.second)

      // チャンネルを完了させて閉じる
      chan1.done()
      chan2.done()
    }
  }

これを実行すると以下のように出力される:

Hello
WORLD

OxはProject LoomというJava 17でプレビュー導入され、Java 21以降で導入された軽量スレッド機構を土台として組み立てられているため、Goroutine同様に、大量にforkUserを作成してもパフォーマンス上の打撃を受けにくくなっている。むしろどんどん作ってくださいという感じ。

openjdk.org

この設計上の決定により、OxはScala 3 + JDK 21以上の環境でのみ動作する。

Oxの何が良いか

Oxは軽い書き味ながらも、非同期処理を書くために必要な基礎部品が全部揃っている。たとえばチャンネルを使うまでもないシチュエーションのために、より高級なメソッドが用意されている:

// 全ての関数を非同期に実行し、先に完了したものの値を採用する。どれかが完了した時点で他のフォークは終了される
race(
  () => { sleep(1.second); println("Hello") },
  () => { sleep(2.second); println("World") },
)()
// => Hello

// 全ての関数を非同期に実行し、全てが揃うまで待機する。
par(
  () => {
    println("Starting first fiber")
    sleep(1.second)
    println("First done")
    42
  },
  () => {
    println("Starting second fiber")
    sleep(2.second)
    println("Second done")
    666
  },
)
=> (42, 666)

他のライブラリだと専門的な知識や事前に重厚な準備が必要になることが多いのだが、Oxは初見でもほぼ正しく書くことができる。

単純に非同期に実行するだけでなく、exponential backoff付きリトライ機構やフォークの事後キャンセルなどの仕組みも最初から用意されているため、手作りしなければならない箇所が少なくてすむ。これにより、ちょっとした非同期処理からチャンネルを駆使した非同期処理までソツなくこなす柔軟性を手にしている。ボイラープレート地獄から人間を解放してもらおう。

// タイムアウト機構
def controlingTimeout =
  def computation = {
    println("starting heavy computation")
    sleep(2.second)
    println("heavy computation done")
    42
  }
  // 1秒まで待つ。超えると例外を発出する。TryにキャッチされてFailureになる
  val result = Try(timeout(1.second)(computation))
  println(result)
// リトライ機構
def retryingExecution =
  def randomlyFail = {
    if (math.random() < 0.8) {
      println("Failed")
      throw new RuntimeException("boom!")
    }
    println("Succeeded")
    42
  }

  // exponential backoffしながらリトライする。ジッター機構完備
  Try(
    ox.retry.retry(
      RetryPolicy.backoff(
        maxRetries = 10,
        initialDelay = 100.millis,
        maxDelay = 1.second,
        jitter = Jitter.Equal,
      ),
    )(randomlyFail),
  )
// 4並列ストリーム処理
def transformation =
  supervised {
    Source
      .iterate(0)(_ + 1) // natural number
      .filter(_ % 2 == 0) // even number
      .mapParUnordered(4) { n =>
        sleep((Math.random * 100).millis)
        n + 1
      // add 1
      }
      .take(100)
      .foreach(n => println(n.toString))
  }

また、もちろんScalaの型システムの上に載っているので、ジェネリクスや型クラスなども当然扱えるのが嬉しいところ。がんばって終了を待機する処理を手書きしたり、リトライ機構を手で実装しなくてもいい。

Oxを使うために必要なもの

前述したように、OxはJDK 21で取り込まれたLoomという軽量スレッド機構の上に組み立てられたライブラリだ。したがって今のところScala.jsやネイティブバイナリへのコンパイルは想定されていない(用途上、瞬時に起動してほしいユースケースはあまりないだろうから問題ないはずだ)。したがってJDK 21以上の実行環境が必要だ。

ライブラリ依存性としては1つ追加するだけでよい:

// build.sbt
// ...
libraryDependencies ++= Seq(
  "com.softwaremill.ox" %% "core" % "0.0.25"
),

Oxの注意点

Oxはまだ非常に若いライブラリだ。最後のリリースだってわずか8日前だし、どんどん機能が追加されたり、breaking changesが起こっている。したがって今すぐプロダクション環境に組込むことはおすすめできない状況にある。だがScalaにGoroutineライクな機構が持ち込まれることは福音に違いない。Scalaの世界では伝統的にスレッドかスレッドプール上に作られたシステムでやりくりしていた。Oxは直接スレッドを扱うメンタルモデルが適さない局面で、とても良いDXをプログラマに提供すると思う。大量のデータが流れ込むバッチ処理で、JVMのJIT処理の助けを借りて高速に複雑な非同期処理を行えると想像してみてほしい!間違いなく面白いものになると思うからこうして記事にして紹介することにしたというわけ。

所感

前述した通り、Scalaでこれまで非同期処理を扱うために必要なのはスレッドだった。軽量スレッドを扱いたい場合は、かなりの学習コストを支払ってCats Effectなどの非同期ライブラリを使いこなす必要があったし、ランタイムの知識も相応に必要とされた。ちょっとした用途ではとうていこれは許容できない。とにかく、効率的な非同期処理はは手に余るやつだった。

Oxは純粋関数型プログラミングの博士号を持っていない人間にも、効率的で生産的な非同期処理のための門戸を開いてくれる。これまではGoroutineがその専売特許を持っていたが、OxによってScalaにも同様のパラダイムがもたらされるのだ。今や巻き返しの時です。

ちょっとコードを書いて遊んでみてほしい。Cats Effectがキャデラックなら、こいつはスーパーカブ110だ。

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?