Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

マネーフォワードのCSVをParquetに変換する

いろいろと分析したいので、マネーフォワードでエクスポートできるCSVファイルをSparkを使ってParquetに変換したメモ。

マネーフォワード

特に今更説明する必要もないが、マネーフォワードはオンラインで動作する家計簿サービス。カードと連携したり、勝手に科目を付けてくれたりするのでとても便利だ。自分はプレミアム会員でエンジョイしている。

自分はSBIネット銀行なので、それ専用のマネーフォワードを使っている。

ssnb.x.moneyforward.com

プレミアム会員になると、マネーフォワードは特定の月の家計簿をCSV/Excel形式でエクスポートできる。これを使って、あとでGrafanaとかKibanaで眺めてみようというのが最終的な目標。今回はParquetに変換するだけ。

CSVは以下のようなフォーマットになっている:

"計算対象","日付","内容","金額(円)","保有金融機関","大項目","中項目","メモ","振替","ID"
"1","2022/02/24","PAYPAL *UBERBV EATS","-3020","UCカード","食費","外食","","0","abcdef"
...

今回、1年分をしこしこ手元にダウンロードしておいた。

Apache Parquet

Parquetは、データを入れるファイル形式である。めちゃくちゃざっくり言うと、スキーマがついている、オープンで、高速で、互換性があるCSVである。

パケットと発音すればよいらしい。カラム(列)指向のフォーマットで、列単位での高速処理が得意。大抵のデータ基盤やライブラリで対応しているので、マスターデータとしてこの形式に変換しておくと後々使いやすく、おすすめ。スキーマも定義できるので、CSVのように値の扱いに困ることがないというメリットがある。また、データ圧縮にも対応している。今回はZStandardで圧縮する。欠点は、CSVみたいにPerlでガッとテキストベースで処理するようなことができないこと。

parquet.apache.org

blog.3qe.us

Apache Spark

Sparkは以前このブログでもしばしば紹介しているが、Scalaで動く分散データ処理ソフトウェア。分散型アーキテクチャだが、ローカルでも普通に動くのでPandasやNumpyの代わりに使うことができる。分散型なのでちょっと起動が遅いのがたまにきず。

blog.3qe.us

例によって、Scala CLIを使ってスクリプトを書き、依存性の解決とかは全部Scala CLIにやらせる。ここばかりはPythonよりもすごいと思う。

to-parquet.scala.scに以下のスクリプトを保存する:

//> using scala "2.13"
//> using lib "org.apache.spark::spark-core:3.4.0"
//> using lib "org.apache.spark::spark-sql:3.4.0"

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

// Sparkエンジンを起動する
val spark = SparkSession
  .builder()
  .appName("to-parquet")
  .master("local[*]")
  .config("spark.ui.enabled", false)
  .getOrCreate()

// スキーマを手で定義してやる
val schema = StructType(
  Seq(
    StructField("計算対象", IntegerType, nullable = false),
    StructField("日付", DateType, nullable = false),
    StructField("内容", StringType, nullable = false),
    StructField("金額(円)", IntegerType, nullable = false),
    StructField("保有金融機関", StringType, nullable = false),
    StructField("大項目", StringType, nullable = false),
    StructField("中項目", StringType, nullable = false),
    StructField("メモ", StringType, nullable = true),
    StructField("振替", IntegerType, nullable = false),
    StructField("ID", StringType, nullable = false)
  )
)

// 引数に与えられたCSVファイルをそれぞれ読み取り、Parquet形式で保存しなおす
for (arg <- args) {
  // CSVを読んでスキーマを適用し、データフレームを構成する
  val df = spark.read
    .option("header", true)
    .schema(schema)
    .option("dateFormat", "yyyy/MM/dd") // 日付形式のフォーマットを指定するとDateTimeとして読み取れるようになる(デフォルトだとyyyy-MM-dd)
    .option("encoding", "shift_jis") // SparkはShift JISに対応している
    .csv(arg)

  // df.show()
  // df.printSchema()

  val saveFile = s"${arg}.parquet"
  df.coalesce(1) // データフレームが大きいとファイルが分割されるので1つにまとめる
    .write
    .option("encoding", "UTF-8") // 無くてもいいと思うけどUTF-8で保存することを指示する
    .option("compression", "zstd") // zstdで圧縮する
    .mode("overwrite") // 既に出力先がある場合は上書きする
    .parquet(saveFile)
}

このスクリプトをscala-cliを使って呼び出すと、CSVファイルをもとにParquet形式のファイルが出力される。

$ scala-cli ./to-parquet.scala.sc -- foo.csv
...
$ ls
foo.csv
foo.csv.parquet/

ただしSparkは分散処理基盤なので、ノードが分散している前提で設計されている。そういう都合で、Parquetを出力するとディレクトリにParquetファイルが入っているという出力のされ方になる。

そこで、シェルスクリプトを使ってリネームなどをやってもらう:

#!/bin/sh
# sh ./convert.sh *.csv
set -uex

# parquetを出力する
scala-cli ./to-parquet.scala.sc -- $@

# CSVはもう不要なのでorigcsvに移動し、書き込み保護状態にする
mkdir -p origcsv/
chmod 440 $@
mv $@ origcsv/

# SparkはParquetをディレクトリに出力するので取り出し、ディレクトリを削除する
move () {
    # 元々のファイル名が 収入・支出詳細_2022-01-25_2022-02-24.csv のような形式なので修正する
    file=$(echo "$1" | perl -pe 's/^.*?_//')
    # 一時ファイルに逃がす
    pqfile=$(mktemp XXXX.parquet)
    mv $1.parquet/*.parquet $pqfile
    # ディレクトリを消してファイル名を戻す
    rm -rf $1.parquet
    mv $pqfile $file.parquet
}

while (($#)) ; do
    move $1
    shift
done

これで、 ./convert.sh *.csvを実行すると全てのCSVファイルがParquet形式に変換される。CSVファイルはアーカイブに移動される。

Pandasで閲覧してみる

せっかくなのでParquetが他のプラットフォームから読めるか確認してみる。今回はPythonのPandasから読めるか確認してみよう。

# show_parquet.py
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import sys

df = pd.read_parquet(sys.argv[1])
print(df)
table = pa.Table.from_pandas(df)
print(table)

実行してみよう:

$ pip3 install pandas pyarrow
$ python3 show_parquet.py foo.parquet
    計算対象          日付                                       内容   金額(円)           保有金融機関    大項目        中項目    メモ  振替                      ID
0      1  2022-02-24                      PAYPAL *UBERBV EATS   -3020            UCカード     食費         外食  None   0  abcdef
...

[90 rows x 10 columns]
pyarrow.Table
計算対象: int32
日付: date32[day]
内容: string
金額(円): int32
保有金融機関: string
大項目: string
中項目: string
メモ: null
振替: int32
ID: string
----
計算対象: [[1,1,1,1,1,...,1,0,1,1,1]]
日付: [[2022-02-24,...]]
内容: [["PAYPAL *UBERBV EATS",...]]
金額(円): [[-3020,...]]
保有金融機関: [["UCカード",...]]
大項目: [["食費",...]
中項目: [["外食",...]]
メモ: [90 nulls]
振替: [[0,...]]
ID: [["abcdef",...]]

しっかり読めているようだ。

Pyarrowを使ったParquetへの変換

ぶっちゃけ変換するだけならPyarrowを使うだけでも良い。しかし今回はちょっとスキーマを書きたかったし、型がほしかったのでScalaでやることにした。複雑なことになると結局型がないと辛い。

dev.classmethod.jp

つづく

次回はEmbulkでElasticSearchとかにデータをロードすることになると思う。

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?