Apache SparkでCSVファイルを読み込むときに気をつけておきたい挙動に遭遇したのでメモしておく。
Apache Spark
Apache Sparkとは、Apache財団によってメンテナンスされている分散型の計算・分析エンジンで、Scalaを第一級言語としてサポートしている。要するにScalaのPandas/Numpyだと思ってもらえればよい。
先日↑のような記事も書いたのでもしよければ読んでSparkの雰囲気を掴んでほしい。
spark.read.csv
SparkにはCSVをデータフレームとして読み込むためのspark.read.csv
が存在する。
実際的には以下のようにして使う:
// SparkSessionをまず作成する必要がある。ドキュメントでspark.という表記が登場した場合はこのSparkSessionのことを指している。 val spark = SparkSession .builder() .appName("foobarbuzz application") .config("spark.master", "local") // 実行するマスターノードを指定するのが必須なのでlocalとする .getOrCreate() val df = spark.read.option("header", "true").csv("data.csv") val dfWithoutHeader = spark.read.csv("data.csv") // ヘッダいらない場合はこちらでよい
こうすると以下のようにデータが読み込まれる。
df.show()
+---+------------+--------+ |ISO|Satisfaction| NGDPDPC| +---+------------+--------+ |AUS| 7.1|51412.69| |AUT| 7.2|44267.81| |BEL| 6.8|41147.26| |BRA| 6.1| 8846.48| |CAN| 7.0|43626.47| |CHL| 6.2|13494.11| |COL| 5.7| 6337.03| |CRI| 6.3| 11635.2| |CZE| 6.9|17842.87| |DNK| 7.5| 53478.5| |EST| 6.5|17403.81| |FIN| 7.9|42867.09| |FRA| 6.7|37937.86| |DEU| 7.3|41107.22| |GRC| 5.8|18023.87| |HUN| 6.0|12690.17| |ISL| 7.6|53227.63| |IRL| 7.0|61902.66| |ISR| 7.2|36221.12| |ITA| 6.5| 30463.7| +---+------------+--------+ only showing top 20 rows
カラム重複時の挙動
さて、データの都合でカラム名が重複した場合はSparkは混乱を避けるために以下のような処理を行う:
- 0-originなカラムのインデックスを使ってカラム名にsuffixする
実際のコードだとこういう感じになる。
//> using scala "2.13" //> using lib "org.apache.spark::spark-core:3.3.1" //> using lib "org.apache.spark::spark-sql:3.3.1" //> using lib "org.apache.spark::spark-mllib:3.3.1" import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("foobarbuzz application") .config("spark.master", "local") // 実行するマスターノードを指定するのが必須なのでlocalとする .getOrCreate() val df = spark.read.option("header", true).csv("duplication.csv") df.show()
ここで、duplication.csv
はこんな感じ。
header,header,header 0,1,2 3,4,5 6,7,8
すると、以下のように出力される:
$ scala-cli duplicating-header.scala.sc ... +-------+-------+-------+ |header0|header1|header2| +-------+-------+-------+ | 0| 1| 2| | 3| 4| 5| | 6| 7| 8| +-------+-------+-------+ ...
数字がsuffixとしてひっついているのがわかるはず。
カラムのcase sensitivity
さて、spark.read.csv
メソッドはデフォルトではcase insensitiveに振る舞うので、case sensitiveには異なるようなカラムがある場合はこれを同一のカラム名だと判定し、前項のsuffixingを行ってしまう。
duplication.csv
を修正して、カラム名の一部を大文字にしてみよう。
header,HEADER,Header 0,1,2 3,4,5 6,7,8
スクリプトを実行すると相変わらずsuffixingが行われていることがわかるが、見た目上はcaseが保存されているので厄介だ。
$ scala-cli duplicating-header.scala.sc ... +-------+-------+-------+ |header0|HEADER1|Header2| +-------+-------+-------+ | 0| 1| 2| | 3| 4| 5| | 6| 7| 8| +-------+-------+-------+
カラムのcase sensitivityをcase sensitiveに設定するには、SparkSession
のconfig
メソッドに"spark.sql.caseSensitive" -> true
を渡す。
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("foobarbuzz application") .config("spark.master", "local") .config("spark.sql.caseSensitive", true) // ここでcase sensitivityをグローバルに変更する .getOrCreate()
このコンフィグによって、case違いのカラムは弁別されるようになる。
+------+------+------+ |header|HEADER|Header| +------+------+------+ | 0| 1| 2| | 3| 4| 5| | 6| 7| 8| +------+------+------+
結語
初見でこの挙動に遭遇した場合、特にcase違いのパターンを踏んだ場合、この数字なんやねんとなって何が起こっているのか分からなくなってしまうと思う。自分もぜんぜん分からなかった。
Sparkはデフォではcase insensitiveなんだよね、という情報を探すのに苦労したので、こうしてまとめた記事を書こうと思ったというわけ。
最後に言いたいんだけど、Sparkのドキュメンテーションの検索機能がクソ!! ちゃんと全文検索させてくれ!!
参考文献
この記事では、case sensitivityについてちょろっと書いてある。
Field names in the schema and column names in CSV headers are checked by their positions taking into account spark.sql.caseSensitive.
とシレッと書いてある。
Sparkのconfigurationがここに書いてある(ことになっている)けど、今回使ったコンフィグここに乗ってないんですけど!!!!!!!!!!!!!!!!!!!!!!!!!!!11111 どうなっとんねん