Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

ulimitに死す / ScalaのHTTPクライアントsttpではバックエンドを都度作らずに1つだけ作って使いまわしましょう

ここではsttp4前提で書くがsttp3でもたぶん同じ。

sttpについて説明しておくと、Scalaで利用できるHTTPクライアント実装で、操作体系がシンプルで簡単であり、なおかつ複数のバックエンド(JDK標準のクライアントを使うものとか、もっと並行性制御を強化されたCats Effectなどの上に載ったhttp4sなどを使うものとか、Native環境でも動くlibcurlベースのものなど)を切り替えて透過的に利用できるのが強みだ。現代でHTTPクライアントが必要になったらとりあえずファーストチョイスにしてよいのではないか。

sttp.softwaremill.com

で、ScalaのHTTPクライアントsttpではバックエンドを都度作らずに1つだけ作って使いまわしましょう、という話。

守らないとどうなりますか

java.io.IOException: "Too many open files"などの比較的致命的なエラーが発生する。

以下のように書きましょう

lazy valなどを使って使いまわそう。そしてJVMが死ぬときにバックエンドが閉じるようにしよう。

import sttp.client4.*
import sttp.client4.httpclient.HttpClientSyncBackend

object HttpClientManager {
  private lazy val backend = HttpClientSyncBackend()
  
  def getBackend = backend
  
  def shutdown(): Unit = backend.close()
  
  sys.addShutdownHook(shutdown())
}

object FooBarService {
  def fetch() = {
    val backend = HttpClientManager.getBackend
    val response = basicRequest
      .get(uri"http://example.com/")
      .send(backend)
  }
}

悪い例: fetchするごとにバックエンドを作成する

import sttp.client4.*
import sttp.client4.httpclient.HttpClientSyncBackend

object FooBarService {
  def fetch() = {
    val backend = HttpClientSyncBackend()
    val response = basicRequest
      .get(uri"http://example.com/")
      .send(backend)
  }
}

なんで?

sttpはヘヴィーユースに耐えられるようにするために、コネクションに関連するリソース、例えばスレッドプールやコネクションプールなどをバックエンド単位で管理する。このおかげで、長期間または連続してHTTPリクエストを実行しても、リソースを節約しつつパフォーマンスの良いHTTPリクエストを実現できる。

HttpClientSyncBackendなどを毎回生成すると、こうしたリソースも毎回作ることになってしまう。以下に示す実験コードを実行すると、どんどんファイルディスクリプタが増えることがわかる(たまにGCで回収されている)。これの回収が間に合わなくなると、カーネルのファイルディスクリプタ制限に引っかかってクラッシュする。

他言語のライブラリの仕組みがどうなっているかについて私は詳しくないのだが、すくなくともsttpでは「暗黙のうちにグローバルなコネクションプールが作られ、そこから暗黙的にコネクションが取り出される」ような挙動を行わない。性能やリソースを制御しにくくなるためだ。

毎回バックエンド作らないでね、ということはドキュメントで明確に指示されている:

sttp.softwaremill.com

In case of most backends, you should only instantiate a backend once per application, as a backend typically allocates resources such as thread or connection pools.

検証してみようのコーナー

せっかくなので検証してみよう。バックエンドを都度作る(GCによって回収されるまで放置される)ものと、同じバックエンドを使いまわすものを順に走らせ、ダミーのサーバに大量にリクエストを発射する。

結果は以下のような感じ(テストプログラムは後述)。

% scala-cli package too-many.scala
% scala-cli package http4s-test-server.scala
# サーバ起動
# どんどんリクエストが来るのでレイテンシが小さいGCにしておく
export JAVA_OPTS='-Xmx4G -XX:+UseShenandoahGC -XX:+UseStringDeduplication'
% ./Http4sTestServer
# テストプログラム起動
% ./TooMany all
=== システム情報 ===
OS: Linux
Java Version: 24.0.1
ファイルディスクリプタ制限 (ulimit -n): 524288
現在のファイルディスクリプタ数: 15

=== パターン1: バックエンドを毎回作成して閉じない ===
リクエスト数: 100
現在のファイルディスクリプタ数: 318
リクエスト数: 200
現在のファイルディスクリプタ数: 204
...
リクエスト数: 5000
現在のファイルディスクリプタ数: 1491

=== パターン2: バックエンドを使いまわして大量リクエスト ===
リクエスト数: 100
現在のファイルディスクリプタ数: 21
リクエスト数: 200
現在のファイルディスクリプタ数: 21
...
リクエスト数: 5000
現在のファイルディスクリプタ数: 21

バックエンドを毎回作るとどんどんファイルディスクリプタが増えるのがわかるだろう。

サーバ

ダミーのサーバをClaudeくんに作ってもらった。http4s-test-server.scalaとして保存する。所々雑な気がする(active connの計算とか)けど本筋とは関係ないので見なかったことにする。

//> using dep org.http4s::http4s-ember-server:0.23.30
//> using dep org.http4s::http4s-ember-client:0.23.30
//> using dep org.http4s::http4s-dsl:0.23.30
//> using dep org.http4s::http4s-circe:0.23.30
//> using dep io.circe::circe-generic:0.14.13

import cats.effect.*
import cats.effect.std.Random
import cats.syntax.all.*
import com.comcast.ip4s.*
import org.http4s.*
import org.http4s.dsl.io.*
import org.http4s.ember.server.*
import org.http4s.implicits.*
import org.http4s.server.middleware.Logger
import org.http4s.circe.*
import io.circe.generic.auto.*
import io.circe.syntax.*
import scala.concurrent.duration.*
import java.time.Instant
import java.util.concurrent.atomic.AtomicLong

object Http4sTestServer extends IOApp {

  // レスポンス用のケースクラス
  case class StatusResponse(
    method: String,
    url: String,
    headers: Map[String, String],
    origin: String,
    timestamp: String
  )

  case class DelayResponse(
    delayed: Int,
    method: String,
    url: String,
    timestamp: String
  )

  case class ServerStats(
    totalRequests: Long,
    activeConnections: Long,
    uptime: Long,
    startTime: String
  )

  // リクエストカウンター
  val requestCounter = new AtomicLong(0)
  val activeConnections = new AtomicLong(0)
  val serverStartTime = Instant.now()

  // ルート定義
  def routes(random: Random[IO]): HttpRoutes[IO] = HttpRoutes.of[IO] {

    // ルートエンドポイント
    case req @ GET -> Root =>
      Ok(s"""
        |Http4s Test Server (httpbin alternative)
        |========================================
        |
        |Available endpoints:
        |  GET  /              - This help message
        |  GET  /delay/:seconds - Delay response by N seconds
        |  GET  /status/:code   - Return specific HTTP status code
        |  GET  /get           - Return request details
        |  POST /post          - Echo POST data
        |  GET  /headers       - Return request headers
        |  GET  /random-delay  - Random delay between 0-5 seconds
        |  GET  /stats         - Server statistics
        |  GET  /slow-response - Slow streaming response
        |  GET  /connection-test - Test connection handling
        |
        |Total requests served: ${requestCounter.get()}
        |Active connections: ${activeConnections.get()}
        |""".stripMargin)

    // 遅延レスポンス
    case req @ GET -> Root / "delay" / IntVar(seconds) =>
      val delay = math.min(seconds, 300) // 最大5分
      for {
        _ <- IO(requestCounter.incrementAndGet())
        _ <- IO(activeConnections.incrementAndGet())
        _ <- IO.sleep(delay.seconds)
        _ <- IO(activeConnections.decrementAndGet())
        response <- Ok(DelayResponse(
          delayed = delay,
          method = req.method.name,
          url = req.uri.toString,
          timestamp = Instant.now().toString
        ).asJson)
      } yield response

    // 特定のステータスコードを返す
    case req @ GET -> Root / "status" / IntVar(code) =>
      for {
        _ <- IO(requestCounter.incrementAndGet())
        result <- Status.fromInt(code) match {
          case Right(status) =>
            IO.pure(Response[IO](status = status).withEntity(s"Status code: $code"))
          case Left(_) =>
            BadRequest(s"Invalid status code: $code")
        }
      } yield result

    // GETリクエストの詳細を返す
    case req @ GET -> Root / "get" =>
      IO(requestCounter.incrementAndGet()) *>
      Ok(StatusResponse(
        method = req.method.name,
        url = req.uri.toString,
        headers = req.headers.headers.map(h => h.name.toString -> h.value).toMap,
        origin = req.remoteAddr.map(_.toString).getOrElse("unknown"),
        timestamp = Instant.now().toString
      ).asJson)

    // POSTデータをエコー
    case req @ POST -> Root / "post" =>
      for {
        _ <- IO(requestCounter.incrementAndGet())
        body <- req.as[String]
        response <- Ok(StatusResponse(
          method = req.method.name,
          url = req.uri.toString,
          headers = req.headers.headers.map(h => h.name.toString -> h.value).toMap,
          origin = req.remoteAddr.map(_.toString).getOrElse("unknown"),
          timestamp = Instant.now().toString
        ).asJson)
      } yield response

    // ヘッダー情報を返す
    case req @ GET -> Root / "headers" =>
      IO(requestCounter.incrementAndGet()) *>
      Ok(Map(
        "headers" -> req.headers.headers.map(h => h.name.toString -> h.value).toMap
      ).asJson)

    // ランダムな遅延(0-5秒)
    case req @ GET -> Root / "random-delay" =>
      for {
        _ <- IO(requestCounter.incrementAndGet())
        _ <- IO(activeConnections.incrementAndGet())
        delay <- random.nextIntBounded(6)
        _ <- IO.sleep(delay.seconds)
        _ <- IO(activeConnections.decrementAndGet())
        response <- Ok(Map(
          "delayed" -> delay.toString,
          "timestamp" -> Instant.now().toString
        ).asJson)
      } yield response

    // サーバー統計
    case GET -> Root / "stats" =>
      IO(requestCounter.incrementAndGet()) *>
      Ok(ServerStats(
        totalRequests = requestCounter.get(),
        activeConnections = activeConnections.get(),
        uptime = java.time.Duration.between(serverStartTime, Instant.now()).getSeconds,
        startTime = serverStartTime.toString
      ).asJson)

    // 遅いストリーミングレスポンス
    case GET -> Root / "slow-response" =>
      IO(requestCounter.incrementAndGet()) *>
      Ok(fs2.Stream
        .emits("This is a slow response... ".getBytes.toList)
        .repeat
        .take(1000)
        .covary[IO]
        .metered(100.milliseconds)
      )

    // コネクションテスト(接続を保持)
    case req @ GET -> Root / "connection-test" =>
      for {
        _ <- IO(requestCounter.incrementAndGet())
        _ <- IO(activeConnections.incrementAndGet())
        response <- Ok(s"Connection established. Active connections: ${activeConnections.get()}")
        // クライアントが接続を閉じるまで保持
        _ <- IO.unit
      } yield response

    // 404 Not Found
    case _ =>
      IO(requestCounter.incrementAndGet()) *>
      NotFound("Endpoint not found")
  }

  // メイン
  def run(args: List[String]): IO[ExitCode] = {
    val port = args.headOption.flatMap(_.toIntOption).getOrElse(8080)

    for {
      random <- Random.scalaUtilRandom[IO]

      // HTTPアプリケーションの構築
      httpApp = Logger.httpApp(
        logHeaders = true,
        logBody = false
      )(routes(random).orNotFound)

      // サーバーの起動
      _ <- EmberServerBuilder
        .default[IO]
        .withHost(ipv4"0.0.0.0")
        .withPort(Port.fromInt(port).getOrElse(port"8080"))
        .withHttpApp(httpApp)
        .withIdleTimeout(5.minutes)
        .withShutdownTimeout(30.seconds)
        .build
        .use { server =>
          IO.println(
            "Http4s Test Server Started!\n" +
            "==========================\n" +
            s"Listening on: ${server.address.getHostString}:${server.address.getPort}\n" +
            "\n" +
            "Example usage:\n" +
            s"  curl http://localhost:${server.address.getPort}/\n" +
            s"  curl http://localhost:${server.address.getPort}/delay/2\n" +
            s"  curl http://localhost:${server.address.getPort}/get\n" +
            s"  curl -X POST -d \"test data\" http://localhost:${server.address.getPort}/post\n" +
            "\n" +
            "Press Ctrl+C to stop the server"
          ) *>
          IO.never
        }
    } yield ExitCode.Success
  }
}

これにじゃぶじゃぶリクエストを飛ばす。自分のマシンだから良心が痛まない。

クライアント

以下のファイルをtoo-many.scalaに保存する:

//> using dep com.softwaremill.sttp.client4::core:4.0.8

import sttp.client4.*
import sttp.client4.httpclient.HttpClientSyncBackend
import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Try, Success, Failure}
import java.net.http.HttpClient
import java.time.Duration

object TooManyFilesRepro {

  // パターン1: バックエンドを毎回作成して閉じない
  def pattern1_createBackendWithoutClose(): Unit = {
    println("\n=== パターン1: バックエンドを毎回作成して閉じない ===")

    var count = 0
    try {
      while (count < 5000) {
        // 新しいバックエンドを作成するが、closeしない(リーク!)
        val backend = HttpClientSyncBackend()
        val response = basicRequest
          .get(uri"http://localhost:8080/delay/0")
          .send(backend)

        count += 1
        if (count % 100 == 0) {
          println(s"リクエスト数: $count")
          // ファイルディスクリプタ数を表示(Linux/Mac)
          printFileDescriptorCount()
        }
      }
    } catch {
      case e: Exception =>
        println(s"エラー発生!リクエスト数: $count")
        println(s"エラー: ${e.getClass.getName}: ${e.getMessage}")
        e.printStackTrace()
    }
  }

  // パターン2: バックエンドを使いまわして大量リクエスト
  def pattern2_reuseBackend(): Unit = {
    println("\n=== パターン2: バックエンドを使いまわして大量リクエスト ===")

    val backend = HttpClientSyncBackend()

    var count = 0
    try {
      while (count < 5000) {
        // 同じバックエンドを使いまわす
        val response = basicRequest
          .get(uri"http://localhost:8080/delay/0")
          .send(backend)

        count += 1
        if (count % 100 == 0) {
          println(s"リクエスト数: $count")
          // ファイルディスクリプタ数を表示(Linux/Mac)
          printFileDescriptorCount()
        }
      }
    } catch {
      case e: Exception =>
        println(s"エラー発生!リクエスト数: $count")
        println(s"エラー: ${e.getClass.getName}: ${e.getMessage}")
        e.printStackTrace()
    } finally {
      backend.close()
    }
  }


  // ファイルディスクリプタ数を表示(Linux/Mac用)
  def printFileDescriptorCount(): Unit = {
    try {
      val pid = ProcessHandle.current().pid()
      val os = System.getProperty("os.name").toLowerCase()

      val command = if (os.contains("mac")) {
        Array("sh", "-c", s"lsof -p $pid | wc -l")
      } else if (os.contains("linux")) {
        Array("sh", "-c", s"ls /proc/$pid/fd | wc -l")
      } else {
        println("Windows環境ではファイルディスクリプタ数の取得はサポートされていません")
        return
      }

      val process = new ProcessBuilder(command*).start()
      val result = scala.io.Source.fromInputStream(process.getInputStream).mkString.trim
      println(s"現在のファイルディスクリプタ数: $result")
    } catch {
      case e: Exception =>
        println(s"ファイルディスクリプタ数の取得に失敗: ${e.getMessage}")
    }
  }

  // システム情報を表示
  def printSystemInfo(): Unit = {
    println("=== システム情報 ===")
    println(s"OS: ${System.getProperty("os.name")}")
    println(s"Java Version: ${System.getProperty("java.version")}")

    // ulimitの値を取得(Linux/Mac)
    try {
      val process = new ProcessBuilder("sh", "-c", "ulimit -n").start()
      val limit = scala.io.Source.fromInputStream(process.getInputStream).mkString.trim
      println(s"ファイルディスクリプタ制限 (ulimit -n): $limit")
    } catch {
      case _: Exception => println("ulimit情報の取得に失敗")
    }

    // 初期のファイルディスクリプタ数
    printFileDescriptorCount()
  }

  def main(args: Array[String]): Unit = {
    printSystemInfo()

    val pattern = args.headOption.getOrElse("1")

    pattern match {
      case "1" => pattern1_createBackendWithoutClose()
      case "2" => pattern2_reuseBackend()
      case "all" =>
        pattern1_createBackendWithoutClose()
        System.gc()
        Thread.sleep(5000)
        pattern2_reuseBackend()
      case _ =>
        println("""
          |使用方法: scala-cli run TooManyFilesRepro.scala -- [パターン番号]
          |
          |パターン:
          |  1 - バックエンドを毎回作成して閉じない(デフォルト)
          |  2 - バックエンドを使いまわして大量リクエスト
          |  all - すべてのパターンを実行
          |""".stripMargin)
    }
  }
}
★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?