読者です 読者をやめる 読者になる 読者になる

froglog

プログラミングや統計の話など

はじめての Apache Spark アプリケーション開発で困ったところ

Apache Spark Java

このエントリについて

ここ2ヶ月ぐらい Apache Spark でバッチ処理をするアプリケーションを作っていました。 Apache Spark でがっつり何かを作るのは今回が初めてで、結構詰まったりしたところがありました。 自戒、および他の誰かの役にたてばという意味をこめてどういうところで困ったかというメモを残しておきます。

ちなみに Spark 1.1.0 で Java による開発という前提です。 (作ってるうちに Spark 1.2.0 が出てしまいました…)

困ったところ諸々

AWS EMR 上での jar ファイル利用

EMR 上で Spark アプリケーションとして作った jar ファイルを実行する方法が Apache Spark、AWS ともに公式提供されていません。 shell から動かせますよ、という記事ならあるのですが、やりたいのはそうじゃない。 最新の Spark バージョンが反映されるのもあまり早くありません。(本文執筆時点で 1.1.0 までしか載っていない)

なんかを参考にして頑張るしかありません。 そのうち誰かがいい感じの bootstrap スクリプトのセットをどこかに up してくれることでしょう…。(もしくは既に)

また Java で Spark を使う場合、ラムダ式を使えるかどうか、つまり Java 8 を使えるかどうかが開発効率にかなり影響します。 EC2 インスタンスはデフォルトでは Java 8 が入っていないので、Java の方にはそこも面倒な障壁になるでしょう。

Kryo Serializer のバイナリファイル入出力

公式ドキュメントにあるとおり、Spark ではシリアライズフレームワークとして Java Serialization と Kryo Serialization の2つを使えます。 ノード間のデータ移動やバイナリ形式でのディスク読み書きでシリアライズが使われるためパフォーマンスに大きな影響があります。 ドキュメントで

Kryo is significantly faster and more compact than Java serialization (often as much as 10x)

とあるとおり、公式では Kryo 推しな雰囲気があります。 Java Serialization の場合は自作クラスに java.io.Serializable を実装する必要がある一方、Kryo はよきにはからってくれる場合がほとんどであり、ここに書いてある速度・サイズの観点のみでなく開発効率でも Kryo に軍配が上がるでしょう。

ところが Kryo にも落とし穴がありました。

Though kryo is supported for RDD caching and shuffling, it’s not natively supported to serialize to the disk. Both methods, saveAsObjectFile on RDD and objectFile method on SparkContext supports only java serialization.

RDD からバイナリ形式でのディスクへのファイル読み書きが Kryo に対応していないとのこと。 saveAsObjectFile() が失敗して、結構悩んで上記リンクにたどりつきました。

解決策はリンク先にあるとおり、自分で byte 形式で読み書きできるメソッドを作ることです。 上記は Scala の例ですが、Java でも少し変えて似たような感じでできました。 早く Spark 本体で対応してほしいところです。

遅延評価

まあこれは Spark でプログラム書くなら最低限知っとけよって話だと思いますが…

次の例はテキストファイルを読み込み、各行の文字数を合計して全体の文字数を出すというものです。

logger.info("ファイル読み込みを開始します。");
JavaRDD<String> lines = sparkContext.textFile("data.txt");
logger.info("ファイル読み込みを完了しました。");

logger.info("各行の文字数カウントを開始します。");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
logger.info("各行の文字数カウントを完了しました。");

logger.info("全体の文字数の合計を開始します。");
int totalLength = lineLengths.reduce((a, b) -> a + b);
logger.info("全体の文字数の合計を完了しました。");

こちらは極端な例ですが、システムのどこで問題が起こったか分かるようにアプリケーションのログを出すというのはよくあると思います。

上記の例は一見問題なさそうなんですが、実はログのメッセージと実際の動きが合っていません。 Spark の処理は効率化のため遅延評価されます。 Transformation 系のメソッドである map() の行を抜けた時点、つまり "各行の文字数カウントを完了しました。" のメッセージの時点ではまだ計算が始まっていません。 Action 系のメソッドである reduce() が実行されて初めて map 処理である文字数カウントの実行が始まります。

うっかりしているとこのように、実際の挙動と異なるタイミングのログメッセージを書いてしまったりします。反省。

Kryo でシリアライズできないケース

Kryo はほとんどの場合よきにはからってシリアライズしてくれる、と前述しました。 よきにはからってくれないケースの話です。

今回私が開発していたアプリケーションでは演算時間が結構シビアであるため、Java のコレクションクラスにもこだわろうという向きがありました。 こちらこのへんを見て、「Koloboke ってやつ良さそうやん?」ってなって Koloboke のコレクションクラスを多用した結果、これが失敗。 どうやら Koloboke のコレクションクラスのオブジェクトは Kryo でシリアライズできない場合があるらしく、NoSuchElementException が出まくって死にました。 一方でなぜか Kryo でシリアライズできてしまう場合もあり、そのために発見に時間がかかってしまいました。憎らしい。

よきにはからってくれない場合は KryoSerializable インターフェースを実装するという手もあります。 シリアライズまわりは何かと Spark 初心者が引っかかりやすいところなのかもしれません。

まとめ

  • Amazon さん、EMR の対応お願いします
  • Kryo でのバイナリ書き出しに注意
  • 遅延評価されていることを忘れるな
  • Kryo でなんでもシリアライズできるわけではない

繰り返しますが Spark 1.1.0 での話なので後続のバージョンでは改善されるかも。

同じようなことに引っかかった場合の一助にしていただけると幸いです。

参考になったサイト等