ここではsttp4前提で書くがsttp3でもたぶん同じ。
sttpについて説明しておくと、Scalaで利用できるHTTPクライアント実装で、操作体系がシンプルで簡単であり、なおかつ複数のバックエンド(JDK標準のクライアントを使うものとか、もっと並行性制御を強化されたCats Effectなどの上に載ったhttp4sなどを使うものとか、Native環境でも動くlibcurlベースのものなど)を切り替えて透過的に利用できるのが強みだ。現代でHTTPクライアントが必要になったらとりあえずファーストチョイスにしてよいのではないか。
で、ScalaのHTTPクライアントsttpではバックエンドを都度作らずに1つだけ作って使いまわしましょう、という話。
守らないとどうなりますか
java.io.IOException: "Too many open files"
などの比較的致命的なエラーが発生する。
以下のように書きましょう
lazy val
などを使って使いまわそう。そしてJVMが死ぬときにバックエンドが閉じるようにしよう。
import sttp.client4.* import sttp.client4.httpclient.HttpClientSyncBackend object HttpClientManager { private lazy val backend = HttpClientSyncBackend() def getBackend = backend def shutdown(): Unit = backend.close() sys.addShutdownHook(shutdown()) } object FooBarService { def fetch() = { val backend = HttpClientManager.getBackend val response = basicRequest .get(uri"http://example.com/") .send(backend) } }
悪い例: fetchするごとにバックエンドを作成する
import sttp.client4.* import sttp.client4.httpclient.HttpClientSyncBackend object FooBarService { def fetch() = { val backend = HttpClientSyncBackend() val response = basicRequest .get(uri"http://example.com/") .send(backend) } }
なんで?
sttpはヘヴィーユースに耐えられるようにするために、コネクションに関連するリソース、例えばスレッドプールやコネクションプールなどをバックエンド単位で管理する。このおかげで、長期間または連続してHTTPリクエストを実行しても、リソースを節約しつつパフォーマンスの良いHTTPリクエストを実現できる。
HttpClientSyncBackend
などを毎回生成すると、こうしたリソースも毎回作ることになってしまう。以下に示す実験コードを実行すると、どんどんファイルディスクリプタが増えることがわかる(たまにGCで回収されている)。これの回収が間に合わなくなると、カーネルのファイルディスクリプタ制限に引っかかってクラッシュする。
他言語のライブラリの仕組みがどうなっているかについて私は詳しくないのだが、すくなくともsttpでは「暗黙のうちにグローバルなコネクションプールが作られ、そこから暗黙的にコネクションが取り出される」ような挙動を行わない。性能やリソースを制御しにくくなるためだ。
毎回バックエンド作らないでね、ということはドキュメントで明確に指示されている:
In case of most backends, you should only instantiate a backend once per application, as a backend typically allocates resources such as thread or connection pools.
検証してみようのコーナー
せっかくなので検証してみよう。バックエンドを都度作る(GCによって回収されるまで放置される)ものと、同じバックエンドを使いまわすものを順に走らせ、ダミーのサーバに大量にリクエストを発射する。
結果は以下のような感じ(テストプログラムは後述)。
% scala-cli package too-many.scala % scala-cli package http4s-test-server.scala
# サーバ起動 # どんどんリクエストが来るのでレイテンシが小さいGCにしておく export JAVA_OPTS='-Xmx4G -XX:+UseShenandoahGC -XX:+UseStringDeduplication' % ./Http4sTestServer
# テストプログラム起動 % ./TooMany all === システム情報 === OS: Linux Java Version: 24.0.1 ファイルディスクリプタ制限 (ulimit -n): 524288 現在のファイルディスクリプタ数: 15 === パターン1: バックエンドを毎回作成して閉じない === リクエスト数: 100 現在のファイルディスクリプタ数: 318 リクエスト数: 200 現在のファイルディスクリプタ数: 204 ... リクエスト数: 5000 現在のファイルディスクリプタ数: 1491 === パターン2: バックエンドを使いまわして大量リクエスト === リクエスト数: 100 現在のファイルディスクリプタ数: 21 リクエスト数: 200 現在のファイルディスクリプタ数: 21 ... リクエスト数: 5000 現在のファイルディスクリプタ数: 21
バックエンドを毎回作るとどんどんファイルディスクリプタが増えるのがわかるだろう。
サーバ
ダミーのサーバをClaudeくんに作ってもらった。http4s-test-server.scala
として保存する。所々雑な気がする(active connの計算とか)けど本筋とは関係ないので見なかったことにする。
//> using dep org.http4s::http4s-ember-server:0.23.30 //> using dep org.http4s::http4s-ember-client:0.23.30 //> using dep org.http4s::http4s-dsl:0.23.30 //> using dep org.http4s::http4s-circe:0.23.30 //> using dep io.circe::circe-generic:0.14.13 import cats.effect.* import cats.effect.std.Random import cats.syntax.all.* import com.comcast.ip4s.* import org.http4s.* import org.http4s.dsl.io.* import org.http4s.ember.server.* import org.http4s.implicits.* import org.http4s.server.middleware.Logger import org.http4s.circe.* import io.circe.generic.auto.* import io.circe.syntax.* import scala.concurrent.duration.* import java.time.Instant import java.util.concurrent.atomic.AtomicLong object Http4sTestServer extends IOApp { // レスポンス用のケースクラス case class StatusResponse( method: String, url: String, headers: Map[String, String], origin: String, timestamp: String ) case class DelayResponse( delayed: Int, method: String, url: String, timestamp: String ) case class ServerStats( totalRequests: Long, activeConnections: Long, uptime: Long, startTime: String ) // リクエストカウンター val requestCounter = new AtomicLong(0) val activeConnections = new AtomicLong(0) val serverStartTime = Instant.now() // ルート定義 def routes(random: Random[IO]): HttpRoutes[IO] = HttpRoutes.of[IO] { // ルートエンドポイント case req @ GET -> Root => Ok(s""" |Http4s Test Server (httpbin alternative) |======================================== | |Available endpoints: | GET / - This help message | GET /delay/:seconds - Delay response by N seconds | GET /status/:code - Return specific HTTP status code | GET /get - Return request details | POST /post - Echo POST data | GET /headers - Return request headers | GET /random-delay - Random delay between 0-5 seconds | GET /stats - Server statistics | GET /slow-response - Slow streaming response | GET /connection-test - Test connection handling | |Total requests served: ${requestCounter.get()} |Active connections: ${activeConnections.get()} |""".stripMargin) // 遅延レスポンス case req @ GET -> Root / "delay" / IntVar(seconds) => val delay = math.min(seconds, 300) // 最大5分 for { _ <- IO(requestCounter.incrementAndGet()) _ <- IO(activeConnections.incrementAndGet()) _ <- IO.sleep(delay.seconds) _ <- IO(activeConnections.decrementAndGet()) response <- Ok(DelayResponse( delayed = delay, method = req.method.name, url = req.uri.toString, timestamp = Instant.now().toString ).asJson) } yield response // 特定のステータスコードを返す case req @ GET -> Root / "status" / IntVar(code) => for { _ <- IO(requestCounter.incrementAndGet()) result <- Status.fromInt(code) match { case Right(status) => IO.pure(Response[IO](status = status).withEntity(s"Status code: $code")) case Left(_) => BadRequest(s"Invalid status code: $code") } } yield result // GETリクエストの詳細を返す case req @ GET -> Root / "get" => IO(requestCounter.incrementAndGet()) *> Ok(StatusResponse( method = req.method.name, url = req.uri.toString, headers = req.headers.headers.map(h => h.name.toString -> h.value).toMap, origin = req.remoteAddr.map(_.toString).getOrElse("unknown"), timestamp = Instant.now().toString ).asJson) // POSTデータをエコー case req @ POST -> Root / "post" => for { _ <- IO(requestCounter.incrementAndGet()) body <- req.as[String] response <- Ok(StatusResponse( method = req.method.name, url = req.uri.toString, headers = req.headers.headers.map(h => h.name.toString -> h.value).toMap, origin = req.remoteAddr.map(_.toString).getOrElse("unknown"), timestamp = Instant.now().toString ).asJson) } yield response // ヘッダー情報を返す case req @ GET -> Root / "headers" => IO(requestCounter.incrementAndGet()) *> Ok(Map( "headers" -> req.headers.headers.map(h => h.name.toString -> h.value).toMap ).asJson) // ランダムな遅延(0-5秒) case req @ GET -> Root / "random-delay" => for { _ <- IO(requestCounter.incrementAndGet()) _ <- IO(activeConnections.incrementAndGet()) delay <- random.nextIntBounded(6) _ <- IO.sleep(delay.seconds) _ <- IO(activeConnections.decrementAndGet()) response <- Ok(Map( "delayed" -> delay.toString, "timestamp" -> Instant.now().toString ).asJson) } yield response // サーバー統計 case GET -> Root / "stats" => IO(requestCounter.incrementAndGet()) *> Ok(ServerStats( totalRequests = requestCounter.get(), activeConnections = activeConnections.get(), uptime = java.time.Duration.between(serverStartTime, Instant.now()).getSeconds, startTime = serverStartTime.toString ).asJson) // 遅いストリーミングレスポンス case GET -> Root / "slow-response" => IO(requestCounter.incrementAndGet()) *> Ok(fs2.Stream .emits("This is a slow response... ".getBytes.toList) .repeat .take(1000) .covary[IO] .metered(100.milliseconds) ) // コネクションテスト(接続を保持) case req @ GET -> Root / "connection-test" => for { _ <- IO(requestCounter.incrementAndGet()) _ <- IO(activeConnections.incrementAndGet()) response <- Ok(s"Connection established. Active connections: ${activeConnections.get()}") // クライアントが接続を閉じるまで保持 _ <- IO.unit } yield response // 404 Not Found case _ => IO(requestCounter.incrementAndGet()) *> NotFound("Endpoint not found") } // メイン def run(args: List[String]): IO[ExitCode] = { val port = args.headOption.flatMap(_.toIntOption).getOrElse(8080) for { random <- Random.scalaUtilRandom[IO] // HTTPアプリケーションの構築 httpApp = Logger.httpApp( logHeaders = true, logBody = false )(routes(random).orNotFound) // サーバーの起動 _ <- EmberServerBuilder .default[IO] .withHost(ipv4"0.0.0.0") .withPort(Port.fromInt(port).getOrElse(port"8080")) .withHttpApp(httpApp) .withIdleTimeout(5.minutes) .withShutdownTimeout(30.seconds) .build .use { server => IO.println( "Http4s Test Server Started!\n" + "==========================\n" + s"Listening on: ${server.address.getHostString}:${server.address.getPort}\n" + "\n" + "Example usage:\n" + s" curl http://localhost:${server.address.getPort}/\n" + s" curl http://localhost:${server.address.getPort}/delay/2\n" + s" curl http://localhost:${server.address.getPort}/get\n" + s" curl -X POST -d \"test data\" http://localhost:${server.address.getPort}/post\n" + "\n" + "Press Ctrl+C to stop the server" ) *> IO.never } } yield ExitCode.Success } }
これにじゃぶじゃぶリクエストを飛ばす。自分のマシンだから良心が痛まない。
クライアント
以下のファイルをtoo-many.scala
に保存する:
//> using dep com.softwaremill.sttp.client4::core:4.0.8 import sttp.client4.* import sttp.client4.httpclient.HttpClientSyncBackend import java.util.concurrent.{Executors, TimeUnit} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.DurationInt import scala.util.{Try, Success, Failure} import java.net.http.HttpClient import java.time.Duration object TooManyFilesRepro { // パターン1: バックエンドを毎回作成して閉じない def pattern1_createBackendWithoutClose(): Unit = { println("\n=== パターン1: バックエンドを毎回作成して閉じない ===") var count = 0 try { while (count < 5000) { // 新しいバックエンドを作成するが、closeしない(リーク!) val backend = HttpClientSyncBackend() val response = basicRequest .get(uri"http://localhost:8080/delay/0") .send(backend) count += 1 if (count % 100 == 0) { println(s"リクエスト数: $count") // ファイルディスクリプタ数を表示(Linux/Mac) printFileDescriptorCount() } } } catch { case e: Exception => println(s"エラー発生!リクエスト数: $count") println(s"エラー: ${e.getClass.getName}: ${e.getMessage}") e.printStackTrace() } } // パターン2: バックエンドを使いまわして大量リクエスト def pattern2_reuseBackend(): Unit = { println("\n=== パターン2: バックエンドを使いまわして大量リクエスト ===") val backend = HttpClientSyncBackend() var count = 0 try { while (count < 5000) { // 同じバックエンドを使いまわす val response = basicRequest .get(uri"http://localhost:8080/delay/0") .send(backend) count += 1 if (count % 100 == 0) { println(s"リクエスト数: $count") // ファイルディスクリプタ数を表示(Linux/Mac) printFileDescriptorCount() } } } catch { case e: Exception => println(s"エラー発生!リクエスト数: $count") println(s"エラー: ${e.getClass.getName}: ${e.getMessage}") e.printStackTrace() } finally { backend.close() } } // ファイルディスクリプタ数を表示(Linux/Mac用) def printFileDescriptorCount(): Unit = { try { val pid = ProcessHandle.current().pid() val os = System.getProperty("os.name").toLowerCase() val command = if (os.contains("mac")) { Array("sh", "-c", s"lsof -p $pid | wc -l") } else if (os.contains("linux")) { Array("sh", "-c", s"ls /proc/$pid/fd | wc -l") } else { println("Windows環境ではファイルディスクリプタ数の取得はサポートされていません") return } val process = new ProcessBuilder(command*).start() val result = scala.io.Source.fromInputStream(process.getInputStream).mkString.trim println(s"現在のファイルディスクリプタ数: $result") } catch { case e: Exception => println(s"ファイルディスクリプタ数の取得に失敗: ${e.getMessage}") } } // システム情報を表示 def printSystemInfo(): Unit = { println("=== システム情報 ===") println(s"OS: ${System.getProperty("os.name")}") println(s"Java Version: ${System.getProperty("java.version")}") // ulimitの値を取得(Linux/Mac) try { val process = new ProcessBuilder("sh", "-c", "ulimit -n").start() val limit = scala.io.Source.fromInputStream(process.getInputStream).mkString.trim println(s"ファイルディスクリプタ制限 (ulimit -n): $limit") } catch { case _: Exception => println("ulimit情報の取得に失敗") } // 初期のファイルディスクリプタ数 printFileDescriptorCount() } def main(args: Array[String]): Unit = { printSystemInfo() val pattern = args.headOption.getOrElse("1") pattern match { case "1" => pattern1_createBackendWithoutClose() case "2" => pattern2_reuseBackend() case "all" => pattern1_createBackendWithoutClose() System.gc() Thread.sleep(5000) pattern2_reuseBackend() case _ => println(""" |使用方法: scala-cli run TooManyFilesRepro.scala -- [パターン番号] | |パターン: | 1 - バックエンドを毎回作成して閉じない(デフォルト) | 2 - バックエンドを使いまわして大量リクエスト | all - すべてのパターンを実行 |""".stripMargin) } } }