Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Node.jsでGoogle Cloud Storage上のファイルからファイルへ、変換しつつ保存する

マニアックなネタだが割とハマったのでメモっておく。

Google Cloud Storage

詳細は割愛するが、要するにAWS S3のGoogle版である

Node.js SDK

Node.jsにはGoogleまわりのSDKが提供されていて、Cloud Storageを操作するためのSDKも用意されている。

cloud.google.com

これを使うと、簡単にCloud Storage上のファイルをローカルにダウンロードしたり、移動や削除もできる。

また、これをCloud Functionsなどから呼び出せば面倒な認証・認可まわりの処理を勝手にプラットフォーム側でやってもらえるので非常に楽。

ストリーミングAPI

ところでNode.jsにはストリーミング処理を扱うための諸々の標準ライブラリが用意されている。

nodejs.org

そして嬉しいことに、SDKもこれに対応したcreateReadStreamといったAPIを用意しているため、Cloud Storage上のファイルをストリーミングしてファイルにダウンロードといったことが可能だ。

cloud.google.com

今回はこのストリーミングAPIを使って、一定量ずつファイルを取り出しつつ変換を加え、別のファイルとしてアップロードしたい。題材として、最近よくいじっているSRTファイルをJSON Linesに変換する、ということをやってみる。

ja.wikipedia.org

けっこうこのNode.jsにおけるストリーミング処理というのが癖があって(個人の感想です)、成功するまで何度も何度もいじらなければならなかった。

コード

いきなりだがコードを貼る。

import * as ff from '@google-cloud/functions-framework';
import { Storage } from "@google-cloud/storage";
import split from 'split2'; // splitのstream版ライブラリ
import st from 'stream';

type Body = {
  cloudStorageUrl: string; // リクエストとしてGCS上のgs://で表現されるURLを受け取る
}

ff.http('ほげふがFunction', async (req: ff.Request, res: ff.Response) => {
  const body = req.body as Body;
  const gsUri = new URL(body.cloudStorageUrl);
  const path = gsUri.pathname.substring(1);
  const bucketName = gsUri.hostname;
  await processing(bucketName, path);
  res.send(`ok`);
});

const processing: (bucketName: string, path: string) => Promise<unknown> =
  async (bucketName: string, path: string) => {
    const storage = new Storage();
    const bucket = storage.bucket(bucketName);
    const srtStream = bucket.file(path).createReadStream();
    // ポイント1: リジュームする必要が無い場合resumable:falseにするとパフォーマンスが良くなる
    const srtJsonLStream = bucket.file(`${path}.jsonl`).createWriteStream({ resumable: false });

    return await runStream(srtStream, srtJsonLStream);
  };

// 出力するJSONの構造
type SRTRecord = {
  id: number;
  start: number;
  end: number;
  text: string;
};

// ストリーム処理の本体
const runStream = async (input: NodeJS.ReadableStream, output: NodeJS.WritableStream) => {
  // srt形式は2つの改行で1レコードになる
  const SEP = "\n\n";
  const p = new Promise((resolve, reject) => {
    // inputを\n\nでsplitしてレコードごとに処理し、outputに流す
    const stream = input.pipe(split(SEP)).pipe(tr).pipe(output);
    stream.on('finish', () => {
      console.log('done!');
      resolve('ok');
    }); // ポイント2: pipeした後の全体のstreamに対して(inputに対してではない)'finish'イベントをリッスンさせる
    input.on('error', (err) => {
      console.error(err);
      reject(err);
    });
    stream.on('error', (err) => {
      console.error(err);
      reject(err);
    });
  });
  return await p;
};

// コアとなる変換部。Transformを作ってその中にチャンク(ここでは\n\nで区切られた単位がチャンクになっている)ごとの処理を書き、
// 処理結果をcallbackに渡す必要がある
const tr = new st.Transform({
  transform(chunk, encoding, callback) {
    const [id, time, text] = chunk.toString().split("\n");
    // ポイント3: 返すべきデータがないときは空引数でcallbackを呼び出さなければ詰まる
    if (id === "" || time === "" || text === "") return callback();
    const [start, end] = time.split(" --> ").map((t: string) => {
      // TODO: comma
      const [h, m, s] = t.split(":").map((t) => parseFloat(t));
      return h * 3600 + m * 60 + s;
    });
    const result: SRTRecord = { id: parseInt(id), start, end, text };
    console.debug(JSON.stringify(result));
    callback(null, JSON.stringify(result) + "\n");
  }
});

いちばん詰まったのがポイント3で、Transformの中では必ずcallbackを呼び出さなければならない。これがないとより上位のレイヤーでは「まだ処理中なのか」となって処理が終わらなくなってしまう。

ポイントをまとめると:

  • inputからpipeして目当てのtransformにつなぎ、最後にoutputまでつなぐ
    • outputまでつなぎきった瞬間に走りだす
  • 終了するまで待つということが標準ではできないので、pipeして繋いだ結果がfinishイベントを出すのを監視する
    • Promiseでラップしてfinishしたらresolveする仕掛けを作る
    • エラー吐いたらrejectする
  • 理由がなければ書き込むときのストリームのオプションはresumable: falseでいい

ストリーミング処理は普段とはまた違う感覚が求められるのですが、慣れると一時ファイルなどを置かずにすむので高効率です。

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?