マニアックなネタだが割とハマったのでメモっておく。
Google Cloud Storage
詳細は割愛するが、要するにAWS S3のGoogle版である
Node.js SDK
Node.jsにはGoogleまわりのSDKが提供されていて、Cloud Storageを操作するためのSDKも用意されている。
これを使うと、簡単にCloud Storage上のファイルをローカルにダウンロードしたり、移動や削除もできる。
また、これをCloud Functionsなどから呼び出せば面倒な認証・認可まわりの処理を勝手にプラットフォーム側でやってもらえるので非常に楽。
ストリーミングAPI
ところでNode.jsにはストリーミング処理を扱うための諸々の標準ライブラリが用意されている。
そして嬉しいことに、SDKもこれに対応したcreateReadStream
といったAPIを用意しているため、Cloud Storage上のファイルをストリーミングしてファイルにダウンロードといったことが可能だ。
今回はこのストリーミングAPIを使って、一定量ずつファイルを取り出しつつ変換を加え、別のファイルとしてアップロードしたい。題材として、最近よくいじっているSRTファイルをJSON Linesに変換する、ということをやってみる。
けっこうこのNode.jsにおけるストリーミング処理というのが癖があって(個人の感想です)、成功するまで何度も何度もいじらなければならなかった。
コード
いきなりだがコードを貼る。
import * as ff from '@google-cloud/functions-framework'; import { Storage } from "@google-cloud/storage"; import split from 'split2'; // splitのstream版ライブラリ import st from 'stream'; type Body = { cloudStorageUrl: string; // リクエストとしてGCS上のgs://で表現されるURLを受け取る } ff.http('ほげふがFunction', async (req: ff.Request, res: ff.Response) => { const body = req.body as Body; const gsUri = new URL(body.cloudStorageUrl); const path = gsUri.pathname.substring(1); const bucketName = gsUri.hostname; await processing(bucketName, path); res.send(`ok`); }); const processing: (bucketName: string, path: string) => Promise<unknown> = async (bucketName: string, path: string) => { const storage = new Storage(); const bucket = storage.bucket(bucketName); const srtStream = bucket.file(path).createReadStream(); // ポイント1: リジュームする必要が無い場合resumable:falseにするとパフォーマンスが良くなる const srtJsonLStream = bucket.file(`${path}.jsonl`).createWriteStream({ resumable: false }); return await runStream(srtStream, srtJsonLStream); }; // 出力するJSONの構造 type SRTRecord = { id: number; start: number; end: number; text: string; }; // ストリーム処理の本体 const runStream = async (input: NodeJS.ReadableStream, output: NodeJS.WritableStream) => { // srt形式は2つの改行で1レコードになる const SEP = "\n\n"; const p = new Promise((resolve, reject) => { // inputを\n\nでsplitしてレコードごとに処理し、outputに流す const stream = input.pipe(split(SEP)).pipe(tr).pipe(output); stream.on('finish', () => { console.log('done!'); resolve('ok'); }); // ポイント2: pipeした後の全体のstreamに対して(inputに対してではない)'finish'イベントをリッスンさせる input.on('error', (err) => { console.error(err); reject(err); }); stream.on('error', (err) => { console.error(err); reject(err); }); }); return await p; }; // コアとなる変換部。Transformを作ってその中にチャンク(ここでは\n\nで区切られた単位がチャンクになっている)ごとの処理を書き、 // 処理結果をcallbackに渡す必要がある const tr = new st.Transform({ transform(chunk, encoding, callback) { const [id, time, text] = chunk.toString().split("\n"); // ポイント3: 返すべきデータがないときは空引数でcallbackを呼び出さなければ詰まる if (id === "" || time === "" || text === "") return callback(); const [start, end] = time.split(" --> ").map((t: string) => { // TODO: comma const [h, m, s] = t.split(":").map((t) => parseFloat(t)); return h * 3600 + m * 60 + s; }); const result: SRTRecord = { id: parseInt(id), start, end, text }; console.debug(JSON.stringify(result)); callback(null, JSON.stringify(result) + "\n"); } });
いちばん詰まったのがポイント3で、Transform
の中では必ずcallback
を呼び出さなければならない。これがないとより上位のレイヤーでは「まだ処理中なのか」となって処理が終わらなくなってしまう。
ポイントをまとめると:
- inputからpipeして目当てのtransformにつなぎ、最後にoutputまでつなぐ
- outputまでつなぎきった瞬間に走りだす
- 終了するまで待つということが標準ではできないので、pipeして繋いだ結果が
finish
イベントを出すのを監視するPromise
でラップしてfinish
したらresolve
する仕掛けを作る- エラー吐いたら
reject
する
- 理由がなければ書き込むときのストリームのオプションは
resumable: false
でいい
ストリーミング処理は普段とはまた違う感覚が求められるのですが、慣れると一時ファイルなどを置かずにすむので高効率です。