たのしい Scalding 入門

TL; DR: git clone https://github.com/niw/scalding_examples.git


ざっとググった感じ、Scalding についてはまだあまり日本語情報なさそうなので、こう広めたりする目的も込めてちょっとまとめておこうと思います。

Scalding とは

Scalding とは、Scala に相当入れ込んでる Twitter で使われているライブラリで、Hadoop の MapReduce を Scala で簡単に書けるものです。中身は Cascading をラップしたものになっています。

Scala に相当入れ込んでる Twitter ではしかしながら Pig が頻繁に使われているのですがある方面では Scalding が使われておりまして、Pig と肩を並べられるくらいに使えて、さらに Scala の秘めた力も使えるんだぜ…! と、少なくとも Scalding の README には書かれております。

そんな Scalding は英単語の意味としては火傷っていう意味があって、下手に Scalding でググるとちょっと目を覆いたくなるような痛々しい画像がいっぱい表示されるので、Scalding でググる際は要注意です。

Scalding を使ってみる

本家 Scalding のページには、おなじみ WordCount のサンプルがどーんと載っていて、簡単そうに見えるのですが、さて実際にそれを動かすとなるとちょっと大変だったりします。

というのも、Scalding で MapReduce を書く部分はいいのですが、Scalding を Hadoop クラスタに送りつけるためにはうまい具合に jar を作らないとダメだったり、添付されているユーティリティスクリプト scald.rb がかなり読みにくかったり (Ruby の会社だったのに) とかいろいろあってちょっと足踏みしちゃうんですね。

そこで、今回はいろいろ整理してさっぱりさせたこちらのレポジトリを使っていこうかと思います。

まずは動かす

以降、README.md と同じ内容になるわけなのですが、多少コメント多めに。

まずは必要な道具を揃えます。ビルドツールに maven を使っているので入れておきます。あと zinc もオススメです。Intelli J IDEA もあると良いですね。

brew install maven
brew install zinc

ちなみに OS X 前提で書いてますが他の環境でも大差ないかなと思います (Windows は… ちょっとわかりません。)

レポジトリをチェックアウトしてビルドします。

git clone https://github.com/niw/scalding_examples.git
cd scalding_examples
mvn -Pzinc clean package

これで依存するモノすべてをダウンロードしてコンパイルしてテストして適切な jar を作ることができます。

ちなみに、使ってる pom.xml は依存関係の中で CDH3(ちょっと古い) がデフォルトになってるので、使ってる Hadoop に合わせて hadoop-core (や hadoop-client) のバージョンを変えてください。そうしないといざ Hadoop にジョブを送りつけてもプロトコルちゃうねんとか言われてしまいます。

これで準備が整ったので何もしない MapReduce を実行してみましょう。

$ echo "Hello\tWorld" > input
$ src/main/scripts/run.sh --local EchoJob --in input --out output
...
$ cat output
Hello World

おお。何もしないですね。タブ区切りのファイルを読んでタブ区切りのファイルに書き出すというまったくもって Map も Reduce もしていない ジョブです。

class EchoJob(args: Args) extends Job(args) {
  Tsv(args("in"), ('key, 'value))
      .write(Tsv(args("out")))
}

src/main/scripts/run.sh は必要最小限のユーティリティスクリプトになっています。--local をつけるとローカルモードになって hadoop コマンドとか使いませんが、作った jar を javahadoop で実行しているだけです。

Hadoop クラスタで実行する

では、次に Hadoop クラスタに実際に MapReduce ジョブを送ってみます。

Hadoop の準備

その前に Hadoop の擬似クラスタとかがローカルやどこかで動いていないといけません。ない場合は次の手順でお手軽にインストールしておきます。

まず Hadoop と言っても Linux と同じでいろんなディストリビューションがあって、本家はバージョン番号意味不明なことになっていたりしてまったくどれを使ったらいいのか謎いのですが、とりあえず ClouderaCDH を使っておきましょう。RedHat の Linux みたいな感じ。

cd /usr/local
curl -O http://archive.cloudera.com/cdh/3/hadoop-0.20.2-cdh3u6.tar.gz
tar xzvf hadoop-0.20.2-cdh3u6.tar.gz
ln -s hadoop-0.20.2-cdh3u6 hadoop

この CDH3 は古いのですが、CHD 4.3 とか Hadoop 2.x を使うと設定の警告が出まくったりします。org.apache.conf.ConfigurationDeprecated 警告の実装が残念なことが原因なのですが、まあ枯れてるほうがトラブル少ないしいろいろググりやすいでしょ、ということで。

次に擬似クラスタ環境の設定をします。本来は複数台のホストで実行する環境なのですが、全部手元で動かしても動きます。

cd hadoop
mv conf conf.original
cp -Rp example-confs/conf.pseudo conf

基本添付されている設定ファイルで問題ないのですが、必要であれば hadoop.tmp.dirdfs.name.dir を変えておきます。

$ mkdir -p /usr/local/hadoop/var/cache
$ cat | patch -p1
--- a/conf/core-site.xml
+++ b/conf/core-site.xml
@@ -11,3 +11,3 @@
      <name>hadoop.tmp.dir</name>
-     <value>/var/lib/hadoop-0.20/cache/${user.name}</value>
+     <value>/usr/local/hadoop/var/cache/${user.name}</value>
   </property>
--- a/conf/hdfs-site.xml
+++ b/conf/hdfs-site.xml
@@ -25,3 +25,3 @@
      <name>dfs.name.dir</name>
-     <value>/var/lib/hadoop-0.20/cache/hadoop/dfs/name</value>
+     <value>/usr/local/hadoop/var/cache/hadoop/dfs/name</value>
   </property>

/usr/local/hadoop/binPATH に追加して hadoop コマンドを使えるようにしておきます。

export PATH=/usr/local/hadoop/bin:$PATH

で、HDFSをフォーマットして

hadoop namenode -format

namenode, datanode, jobtracker と tasktracker を起動します。全部デーモンなんですが、Procfile つくって foreman とかで起動してやるとログが見えてデバッグが楽です。

hadoop namenode
hadoop datanode
hadoop jobtracker
hadoop tasktracker

これらはウェブインターフェイスも提供してくれるので、Hadoop ジョブの状態は http://localhost:50030/ とかで見られます。

Hadoop クラスタで動かしてみる

さて、おなじみの WordCount いってみましょう。

$ cat > input
meow nyan purr
purr nyan nyan
$ hadoop fs -put input input
$ src/main/scripts/run.sh WordCountJob --in input --out output
...
$ hadoop fs -cat 'output/part-*'
meow    1
nyan    3
purr    2

ちゃんと動きましたね。このジョブは行から単語を抜き出すところが通常の Scala のコードとなっていて、 Scalding の良さを実感できるようになっています。

// --in オプションのパスをテキストファイルとして読んで
// 各行のテキストは line フィールドに入るので → [offset, line]
TextLine(args("in"))
  // tokenize() で word フィールドに展開して → [offset, line, word]
  .flatMap('line -> 'word) { line: String => tokenize(line) }
  // 同じ word でグループ化してその数を数えて → [word, size]
  .groupBy('word) { _.size }
  // --out オプションのパスにタブ区切りで書きだす
  .write(Tsv(args("out")))

// ここは普通の Scala
def tokenize(text: String): Array[String] =
  text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")

次のステップ

ざっと最初の一歩は眺められたかなと思うのでつぎはいろいろなジョブを書いて遊んでみると良いかなと思います。Intelli J IDEA で Scala プラグインを入れて pom.xml を開けばそのまま IDE な環境で楽にできます。

idea pom.xml

src/main/scala/Main.scala は Scalding のジョブのブートストラップの部分を簡略したもので、通常 com.twitter.scalding.Tool オブジェクトの main() からジョブを開始させますが、自分で書くこともできます。

ジョブのテストは src/test/scala/WordCountJobSpec.scala などを参考に使ってください。 Spec2 を使う際に in { ... } の中にジョブを書くと困ったことになるので、should { ... } の中に書いて in { ... } はそれぞれの sink ごとに書くのが良いです。

その他、本家の Wiki ページにはかなりの情報がまとまっていますので参考にしましょう。

同じ MapReduce のジョブを Ruby(Hadoop Streaming), Pig, Hive, Scalding などで書いた Rosetta Code も便利です。


というわけで、Scalding を使うと簡単に Scala で MapReduce ジョブが書けることがわかりました。

また Scalding は、ぱっと見わかりやすいように DSL 的なものを Scala の implicit とかの魔術を使って作ってあります。ですので一度なにかよくわからないことが起こるとまったくなにが起こってるのかわからないという素敵な側面もあって楽しそうです。 今後も活用していきたいなあと思います。