いろいろと分析したいので、マネーフォワードでエクスポートできるCSVファイルをSparkを使ってParquetに変換したメモ。
マネーフォワード
特に今更説明する必要もないが、マネーフォワードはオンラインで動作する家計簿サービス。カードと連携したり、勝手に科目を付けてくれたりするのでとても便利だ。自分はプレミアム会員でエンジョイしている。
自分はSBIネット銀行なので、それ専用のマネーフォワードを使っている。
プレミアム会員になると、マネーフォワードは特定の月の家計簿をCSV/Excel形式でエクスポートできる。これを使って、あとでGrafanaとかKibanaで眺めてみようというのが最終的な目標。今回はParquetに変換するだけ。
CSVは以下のようなフォーマットになっている:
"計算対象","日付","内容","金額(円)","保有金融機関","大項目","中項目","メモ","振替","ID" "1","2022/02/24","PAYPAL *UBERBV EATS","-3020","UCカード","食費","外食","","0","abcdef" ...
今回、1年分をしこしこ手元にダウンロードしておいた。
Apache Parquet
Parquetは、データを入れるファイル形式である。めちゃくちゃざっくり言うと、スキーマがついている、オープンで、高速で、互換性があるCSVである。
パケットと発音すればよいらしい。カラム(列)指向のフォーマットで、列単位での高速処理が得意。大抵のデータ基盤やライブラリで対応しているので、マスターデータとしてこの形式に変換しておくと後々使いやすく、おすすめ。スキーマも定義できるので、CSVのように値の扱いに困ることがないというメリットがある。また、データ圧縮にも対応している。今回はZStandardで圧縮する。欠点は、CSVみたいにPerlでガッとテキストベースで処理するようなことができないこと。
Apache Spark
Sparkは以前このブログでもしばしば紹介しているが、Scalaで動く分散データ処理ソフトウェア。分散型アーキテクチャだが、ローカルでも普通に動くのでPandasやNumpyの代わりに使うことができる。分散型なのでちょっと起動が遅いのがたまにきず。
例によって、Scala CLIを使ってスクリプトを書き、依存性の解決とかは全部Scala CLIにやらせる。ここばかりはPythonよりもすごいと思う。
to-parquet.scala.sc
に以下のスクリプトを保存する:
//> using scala "2.13" //> using lib "org.apache.spark::spark-core:3.4.0" //> using lib "org.apache.spark::spark-sql:3.4.0" import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ // Sparkエンジンを起動する val spark = SparkSession .builder() .appName("to-parquet") .master("local[*]") .config("spark.ui.enabled", false) .getOrCreate() // スキーマを手で定義してやる val schema = StructType( Seq( StructField("計算対象", IntegerType, nullable = false), StructField("日付", DateType, nullable = false), StructField("内容", StringType, nullable = false), StructField("金額(円)", IntegerType, nullable = false), StructField("保有金融機関", StringType, nullable = false), StructField("大項目", StringType, nullable = false), StructField("中項目", StringType, nullable = false), StructField("メモ", StringType, nullable = true), StructField("振替", IntegerType, nullable = false), StructField("ID", StringType, nullable = false) ) ) // 引数に与えられたCSVファイルをそれぞれ読み取り、Parquet形式で保存しなおす for (arg <- args) { // CSVを読んでスキーマを適用し、データフレームを構成する val df = spark.read .option("header", true) .schema(schema) .option("dateFormat", "yyyy/MM/dd") // 日付形式のフォーマットを指定するとDateTimeとして読み取れるようになる(デフォルトだとyyyy-MM-dd) .option("encoding", "shift_jis") // SparkはShift JISに対応している .csv(arg) // df.show() // df.printSchema() val saveFile = s"${arg}.parquet" df.coalesce(1) // データフレームが大きいとファイルが分割されるので1つにまとめる .write .option("encoding", "UTF-8") // 無くてもいいと思うけどUTF-8で保存することを指示する .option("compression", "zstd") // zstdで圧縮する .mode("overwrite") // 既に出力先がある場合は上書きする .parquet(saveFile) }
このスクリプトをscala-cli
を使って呼び出すと、CSVファイルをもとにParquet形式のファイルが出力される。
$ scala-cli ./to-parquet.scala.sc -- foo.csv ... $ ls foo.csv foo.csv.parquet/
ただしSparkは分散処理基盤なので、ノードが分散している前提で設計されている。そういう都合で、Parquetを出力するとディレクトリにParquetファイルが入っているという出力のされ方になる。
そこで、シェルスクリプトを使ってリネームなどをやってもらう:
#!/bin/sh # sh ./convert.sh *.csv set -uex # parquetを出力する scala-cli ./to-parquet.scala.sc -- $@ # CSVはもう不要なのでorigcsvに移動し、書き込み保護状態にする mkdir -p origcsv/ chmod 440 $@ mv $@ origcsv/ # SparkはParquetをディレクトリに出力するので取り出し、ディレクトリを削除する move () { # 元々のファイル名が 収入・支出詳細_2022-01-25_2022-02-24.csv のような形式なので修正する file=$(echo "$1" | perl -pe 's/^.*?_//') # 一時ファイルに逃がす pqfile=$(mktemp XXXX.parquet) mv $1.parquet/*.parquet $pqfile # ディレクトリを消してファイル名を戻す rm -rf $1.parquet mv $pqfile $file.parquet } while (($#)) ; do move $1 shift done
これで、 ./convert.sh *.csv
を実行すると全てのCSVファイルがParquet形式に変換される。CSVファイルはアーカイブに移動される。
Pandasで閲覧してみる
せっかくなのでParquetが他のプラットフォームから読めるか確認してみる。今回はPythonのPandasから読めるか確認してみよう。
# show_parquet.py import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import sys df = pd.read_parquet(sys.argv[1]) print(df) table = pa.Table.from_pandas(df) print(table)
実行してみよう:
$ pip3 install pandas pyarrow $ python3 show_parquet.py foo.parquet
計算対象 日付 内容 金額(円) 保有金融機関 大項目 中項目 メモ 振替 ID 0 1 2022-02-24 PAYPAL *UBERBV EATS -3020 UCカード 食費 外食 None 0 abcdef ... [90 rows x 10 columns] pyarrow.Table 計算対象: int32 日付: date32[day] 内容: string 金額(円): int32 保有金融機関: string 大項目: string 中項目: string メモ: null 振替: int32 ID: string ---- 計算対象: [[1,1,1,1,1,...,1,0,1,1,1]] 日付: [[2022-02-24,...]] 内容: [["PAYPAL *UBERBV EATS",...]] 金額(円): [[-3020,...]] 保有金融機関: [["UCカード",...]] 大項目: [["食費",...] 中項目: [["外食",...]] メモ: [90 nulls] 振替: [[0,...]] ID: [["abcdef",...]]
しっかり読めているようだ。
Pyarrowを使ったParquetへの変換
ぶっちゃけ変換するだけならPyarrowを使うだけでも良い。しかし今回はちょっとスキーマを書きたかったし、型がほしかったのでScalaでやることにした。複雑なことになると結局型がないと辛い。
つづく
次回はEmbulkでElasticSearchとかにデータをロードすることになると思う。