こんにちは。SI部の腰塚です。

RDBやデータウェアハウスの仕事に携わることが多かった筆者は、数年前からたびたび聞こえたビッグデータ分析や機械学習のための分散処理フレームワークに興味を覚えたものの、ついぞアクセスしないままここまで来てしまいました。
今回ブログを書くにあたって、せっかくなのでイチから手さぐり入門し、いまさら他人に聞けない分散処理の初歩からhadoop・sparkを触ってみるまでをまとめたいと思います。

1.分散処理の基礎知識

1-1.分散処理の処理方式:MapReduce

まず分散処理とは、ひとつの計算処理をネットワークで接続した複数のコンピュータで同時並列で処理することです。

ビッグデータ活用の市場が日々大きくなるに従って、数百テラ~ペタのデータ処理も珍しいものではなくなっており、日常的にこの規模のデータを扱うシステムでは、現実的な時間的・費用的コストで処理する工夫が必要不可欠です。
データ処理の高速化のためには、単純に処理サーバを高性能にすることも手段の一つですが、例えばサーバの処理性能を2倍にすると費用的コストは2倍では収まりませんし、将来処理するデータが更に増えた場合に性能を上げるのも非常にコストがかかります。
もうひとつの手段としては、安価なサーバを複数台用意して処理を割り振ることで、全体の処理能力を向上させることが考えられます。これが分散処理です。
これには処理割り振りのための複雑なプログラミングが必要になりますが、コスト的には安く、将来的な拡張も簡単です。

この分散処理について、2004年にGoogleが論文で発表した独自の分散処理フレームワークの処理方式がMapReduceです。
GoogleはこのMapReduceを使って、Webサイト内の文字列検索や、特定の単語が含まれるWebページのリスト作成などを行っていると発表しました。
これらの処理は一見シンプルではありますが、その扱うデータ量が2004年8月当時すでに3ペタバイト/1カ月という膨大さであったため、非常に驚きをもって受け入れられました。
MapReduce: Simplified Data Processing on Large Clusters
http://research.google.com/archive/mapreduce.html

1-2.MapReduceのオープンソース実装:Hadoop

このMapreduceの技術理論に、同じくGoogleが発表した分散処理ファイルシステムGFS(Google File System)と分散データベースBig tableの基盤技術を取り入れて実装されたのがHadoopです。
のちにApache Hadoopとしてオープンソース公開されたことで、分散処理を簡単に実装することが可能になり、ビッグデータ活用市場の拡大と分散処理の需要に乗って爆発的に広がりました。
現在では様々な有名企業が自社のシステムにHadoopを導入しています。

1-2-1.Hadoopの導入事例

具体的に導入事例をあげてみますと、

  • Facebook

広告の解析や「フェイスブック・インサイト」という解析ツール上でサイト来訪ユーザ数やいいね!の数を集計するためHadoopを利用。毎日135TB超のデータを処理。

  • Visa

カードの利用パターンを分析することで不正利用を検知するシステムにHadoopを導入。730億件のトランザクションデータ処理を数週間から13分に短縮した。

  • リクルート

「Hotpepper」「Suumo」「じゃらん」などの20を超えるWebアプリケーションで、バッチ処理・ログ解析・レコメンドなど多岐にわたってHadoopを導入。

…などなど、Webサービス・SNSに携わる企業や、データマイニングで導入例が多いように見受けられます。特にログ解析や機械学習を利用したカスタマーへのレコメンド、バッチによる集計処理等に利用されることが多いようです。
この傾向は、Web系業界が扱う、膨大かつ将来的に更に拡大していくデータ・ログを分析・解析するのにHadoopが適しているからと言えるのではないでしょうか。

1-2-2.Hadoopの特徴

では、なぜHadoopがそれらのシステムに向いているのでしょうか。
Hadoopがもともと大規模データを扱いやすい分散処理であるからとも言えますが、私は以下のようなHadoopの設計思想と特徴が需要にマッチしていたからだと考えます。
数ある特徴の中で、筆者が特に重要では?と思ったものを挙げてみます。

①オープンソースの分散処理フレームワーク
Mapper、Reducerと呼ばれる2つのスクリプトを作成するだけで、簡単に分散処理を実装できます。
分散処理を独自で開発するには、プロセス起動・監視・プロセス間の通信等の管理、データ処理の効率的な分散、特定ノード故障時の動作など、複雑な考慮が必要不可欠ですが、
フレームワークを使用することによりそのほとんどをHadoopに任せることができるようになりました。
それらがすべてオープンソース実装されているためソフトウェアの導入コストも低く、運用時に不具合が発生したときにも自分でソースコードを見て修正することも可能です。

②安価なサーバをノードとして増やすことで簡単にスケールできる
将来的にどんどん増え続けることが見込まれるビッグデータやWebサービスのシステムでは、サーバ性能の拡張が必要になることも多いと思います。
冒頭でご説明した通り、分散処理を導入していれば、ひとつの処理サーバの高性能化(=スケールアップ)ではなく、安価なサーバを複数増やす(=スケールアウト)の方式をとることが可能で、コストも安価で済みます。
Hadoopではスケールアウトを前提として設計されているため、サーバ台数の増加に比例して性能を上げていくことが可能です。

③高い耐障害性
耐障害性を重視して設計されており、処理中に一部の構成ノードが故障しても、すぐに別ノードへ処理を振り分けられます。
(ただし、マスターサーバの障害については対処が無く単一障害点になっているそうです。ここは大きな弱点かも?)

④さまざまな言語で処理を書ける
HadoopはJavaで書かれたフレームワークのためJavaで記述するのが一般的ですが、Hadoop Streamingというツールが用意されており、標準入出力を持つ言語であればあらゆる言語でスクリプトを作成できます。
また、HiveやPigといったDSLも提供されています。

⑤大きなデータを処理するバッチ処理には向くが、リアルタイム処理には向かない
分散処理を実行するためにオーバーヘッドが大きく、パフォーマンスが求められるリアルタイム処理には向かない傾向があります。
そういったデータを扱うシステムでは、RDBとの住み分けや役割分担も視野に入れることも考えられます。

1-2-3.Hadoopの構成

上記のようなHadoopの特徴は、以下のような基礎構成要素から成り立ちます。簡単に見ていきましょう。
Hadoopは分散処理・ファイルシステム・データベースの機能として以下のソフトウェアが盛り込まれており、それぞれがGoogleのMapreduce、GFS、Big tableに相当しています。
図1 Hadoopの基盤イメージ

・Hadoop Mapreduce
分散処理フレームワーク。Mapperとreducerというプログラムを作成するだけでユーザは分散処理を実行することができます。
Hadoop Streamingというツールで、標準入出力があるプログラミング言語であればどんな言語でもMapperとReducerを記述できます。

・HDFS(Hadoop Distributed File System)
分散処理ファイルシステム。1台のサーバのストレージに収まりきらない大容量のデータを、何台かのサーバに分割して配置し管理するための仕組み。
HDFSは読み書きを高速化するため、ファイルを一定の大きさのブロック(デフォルトで64メガバイト)に分割し、複数の記憶装置に分散して保存し、読み込み書き込みを並列に実行できるようにしています。

・Hbase
HDFS上に構築された列指向分散データベース。
高い書き込み性能と優れた拡張性を持ち、拡張した後も書き込み性能がそれほど落ちないという特徴があります。

1-2-4.MapReduceの処理方法

分散処理のキモ、Hadoop MapReduceの内部処理について少しだけ掘り下げます。
MapReduceは、Mapフェーズ、Shuffleフェーズ、Reduceフェーズの順に処理されます。
図2 1つのジョブのMapReduce内部処理イメージ

ユーザが意識するのはMap・Reduceフェーズのみで、この2フェーズの動作をプログラム実装するだけで分散処理を実現できます。これがMapper、Reducerです。
図2のような一連の処理のひと塊りをジョブと呼び、それを下記図3のようにタスクという小さい処理に分割してリソース状況や処理の進行状況、故障の有無によってノードに振り分け処理するのが分散処理です。

図3 ジョブをタスクに分散し実行

2014/10月現在、HadoopMapReduceにはVersion1と2が存在し(いわゆるMR1とMR2)、それぞれHadoop1.X.XとHadoop2.X.Xに搭載されています。
違いとしては、従来のMR1がジョブを実行するマスター側のジョブトラッカーとスレーブ側のタスクトラッカーで処理されるのに対し、MR2がYARNという汎用的な分散処理ハンドリングのフレームワークでマスター・スレーブを実現していることが挙げられます。
詳しくは下記をご参照のこと。
http://www.tomsitpro.com/articles/hadoop-2-vs-1,2-718.html

1-3.分散処理を高速化しインタラクティブ分析を可能に:Spark

もうひとつ、Hadoopとよく比較される分散処理基盤として、Apache Sparkをご紹介します。
SparkはHadoopのMapReduce部分に置き換わることを目指して開発された、Scalaで分散処理を行うフレームワークで、いわば高速化されたMapReduceといえます。

HadoopではHDFSに貯めたデータに対して機械学習を行い高度な解析を行うシステムがよく導入されていると前述しましたが、機械学習などの繰り返し処理を行うものではなかなか性能が出にくい理由があります。
例えば、MapReduceで各ノードにジョブを起動する際プログラムの起動に15秒ほどかかったり、ジョブの中で分析対象データをHDFSから取得→集計→HDFSに出力→データを取得…と処理したりすると、ストレージへのオーバーヘッドも無視できない処理時間になります。
そこで、繰り返し処理やドリルダウン分析を行うために、HDFSに特殊なキャッシュを設け、インメモリで高速な分散処理を行うことを考えたのがSparkです。

例えば数TBあるログデータを分析をしたい時、以下のように比較できます。
図4 HadoopとSpark ログ分析の例

このようにSparkのインメモリ分散処理ではHDFSへの書き込みが少ない分非常に高速(※10~100倍高速だそうです)で、繰り返しの処理やドリルダウン分析に適しているようです。
詳しくは下記参照のこと。
http://www.cloudera.co.jp/products-services/cdh/apache-spark.html

1-3-1.Sparkの特徴

Sparkの特徴も簡単にご紹介します。

①RDD(Resillient Distributed Datasets)
繰り返し利用するデータをキャッシュ上に保持することが可能な仕組み。このため繰り返し処理で非常に高いパフォーマンスを発揮します。
通常キャッシュは揮発性ですが、RDDではMapReduceが保持していた高い耐障害性も兼ね備えていて、必要最低限の復旧が可能であるそうです。

②DSL
Maper・Reducerの代わりにScalaのDSLを記述できる。
より汎用的にプログラミングできるようになっています。

③Hadoopとの互換性
自前では分散ファイルシステムを持っていませんが、HadoopのHDFSを利用可能です。
またリソース管理の面でも、通常は自前のstandalone cluster managerで動きますが、前述したHadoop2のYARN上でも稼動します。
これにより、HadoopとSpark双方の得意な部分を組み合わせ、バッチをHadoop+オンライン分析をSparkなどの組み合わせたシステムも容易に作ることができます。

④インメモリでも高い耐障害性
キャッシュ上でデータセットを保持しますが、Hadoop同様高い耐障害性を持っており、たとえRDDのデータが一部損なわれてもRDD内に「入力元となるRDD」と「処理内容」を保持しているためにRDDデータを使って再作成することができます。

詳しくは下記参照のこと。
http://www.ibm.com/developerworks/jp/opensource/library/os-spark/

長くなってしまいましたが、これまでのHadoop・Sparkの情報をまとめてみると…
 ・HadoopとSpark、それぞれの得意分野を組み合わせることで、安価かつ高速で、耐障害性の高い分析システムの構築が可能。
 ・数テラ~数ペタのビッグデータ分析をHadoopバッチジョブとSparkオンライン検索で自在に操ることが可能。
ということが言えそうですが…本当にそんな夢のようなことが可能なのでしょうか?

次章、実際に試してみたいと思います。

2.HadoopとSparkの実践

それでは実際に構築して試してみたいと思います。
今回は触り心地を見るためなのであくまで安価に済ますべく、AmazonEMRやCDHなどを使わない素のHadoop(疑似分散モード)をCentOSに構築します。

使用する環境とインストールする対象は以下の通りです。
・CentOS6.5.x86_64(64bit)   …私有のPCにVMware上でインストール。容量20GB / RAM 1024MB
・jdk1.7.0.65
・Hadoop2.5.0
・Hive0.13.1
・scala2.10.4
・Spark1.1.0

2-1.Hadoopのインストール

CentOSを立ち上げ、専用ユーザでログインした直後から開始します。
ユーザは「hdspark」としました。

//ホームに作業フォルダを作成します
$ mkdir tmp/
$ cd tmp/

//以下、rootで作業します
$ su

//リポジトリをepelに
# wget http://ftp-srv2.kddilabs.jp/Linux/distributions/fedora/epel/6/x86_64/epel-release-6-8.noarch.rpm
# rpm -ivh epel-release-6-8.noarch.rpm

//jdkをインストール。
# yum install java-1.7.0-openjdk-devel.x86_64 --enablerepo=epel

//前提ソフトもろもろもインストール
# yum install gcc-c++ curl make cmake zlib zlib-devel --enablerepo=epel

//準備ができたらHadoopをダウンロードし展開します
# wget http://ftp.riken.jp/net/apache/hadoop/common/hadoop-2.5.0/hadoop-2.5.0.tar.gz
# tar -xzvf hadoop-2.5.0.tar.gz
# chown -R hdspark:hdspark hadoop-2.5.0
# mv hadoop-2.5.0 /usr/local/

//ログインシェルに環境変数を設定
# cd /home/hdspark/
# vi .bashrc
	/* 以下を追記
	export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64
	export HADOOP_HOME=/usr/local/hadoop-2.5.0
	export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
	export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"
	export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$JAVA_HOME/bin:$PATH
	*/

//ここからは操作ユーザにログインしHadoopを設定します
# su - hdspark

//sshでlocalhostにパスなしでログインができるよう設定
$ cd
$ ssh-keygen -t dsa
$ cat .ssh/id_dsa.pub >> .ssh/authorized_keys
$ chmod 600 .ssh/authorized_keys
$ ssh localhost

//パスの確認
$ hadoop version

//etcファイルのプロパティ追加
$ cd $HADOOP_HOME/etc/hadoop
$ vi core-site.xml
	/* 以下を追記
	<property>
	  <name>fs.defaultFS</name>
	  <value>hdfs://localhost:9000</value>
	</property>
	*/
$ vi hdfs-site.xml
	/* 以下を追記
	<property>
	  <name>dfs.replication</name>
	  <value>1</value>
	</property>
	*/
$ vi yarn-site.xml
	/* 以下を追記
	<property>
	  <name>yarn.nodemanager.aux-services</name>
	  <value>mapreduce_shuffle</value>
	</property>
	<property>
	   <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
	   <value>org.apache.hadoop.mapred.ShuffleHandler</value>
	</property>
	*/

//Namenodeのフォーマット
$ hadoop namenode -format

//Hadoop のノード類を起動
$ start-all.sh

//稼動確認
$ jps
	/*このうちjps以外の5プロセスが表示されていれば無事起動完了
	16277 ResourceManager
	17651 DataNode
	17529 NameNode
	18071 Jps
	16367 NodeManager
	16145 SecondaryNameNode
	*/

//hdfsにフォルダを作成してみる
$ hadoop fs -mkdir /input/
$ hadoop fs -ls -R /
	/*hdfsの中が見えればインストール完了
	drwxr-xr-x   - hdspark supergroup          0 2014-09-30 06:14 /input
	*/

稼働確認がてら、オーソドックスなサンプルwordcountを使ってみます。

//こんなファイルを用意します
$ cat a
	a a b b b c b c
$ cat b
	ab a b b c c c

//この2ファイルを上で作成したhdfs://inputに配置します
$ hadoop fs -put a /input/
$ hadoop fs -put b /input/
$ hadoop fs -ls -R /
	drwxr-xr-x   - hdspark supergroup          0 2014-09-30 06:24 /input
	-rw-r--r--   1 hdspark supergroup         16 2014-09-30 06:24 /input/a
	-rw-r--r--   1 hdspark supergroup         15 2014-09-30 06:24 /input/b
$ hadoop fs -cat /input/*
	a a b b b c b c
	ab a b b c c c

//wordcountを呼び出して、/input/配下のテキストファイル内にそれぞれの単語が何文字あるかカウント
$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount /input/ /output/
	14/09/30 06:26:41 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
	14/09/30 06:26:41 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
	・
	・
	・
	        File Input Format Counters
	                Bytes Read=31
	        File Output Format Counters
	                Bytes Written=17

//outputというフォルダができ、結果が出力されています
$ hadoop fs -ls -R /
	drwxr-xr-x   - hdspark supergroup          0 2014-09-30 06:24 /input
	-rw-r--r--   1 hdspark supergroup         16 2014-09-30 06:24 /input/a
	-rw-r--r--   1 hdspark supergroup         15 2014-09-30 06:24 /input/b
	drwxr-xr-x   - hdspark supergroup          0 2014-09-30 06:27 /output
	-rw-r--r--   1 hdspark supergroup          0 2014-09-30 06:27 /output/_SUCCESS
	-rw-r--r--   1 hdspark supergroup         17 2014-09-30 06:27 /output/part-r-00000
$ hadoop fs -cat /output/*
	a       3
	ab      1
	b       6
	c       5
//成功!

2-2.HIVEのインストール

このままMapperとReducerをプログラミングしてもよいのですが、筆者はRDBに慣れているため、せっかくなのでHIVEを使ってみます。
HIVEとは、Facebookによって開発された、RDBを使う感覚でHadoopジョブを実行できる仕組みです。HiveQLというSQLライクのDSL言語を使うだけで内部で勝手にMapとReduceに変換して実行される優れモノです。

//HIVEのインストール
$ su
# cd /usr/local
# wget http://ftp.tsukuba.wide.ad.jp/software/apache/hive/hive-0.13.1/apache-hive-0.13.1-bin.tar.gz
# tar -xzvf apache-hive-0.13.1-bin.tar.gz
# chown -R hdspark:hdspark apache-hive-0.13.1-bin
# ln -sv apache-hive-0.13.1-bin hive
# su - hdspark
$ vi .bashrc
	/* 以下を追記
	export HIVE_HOME=/usr/local/hive
	export PATH=$M2:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH
	*/
$ source .bashrc

//設定を追加
$ cd $HIVE_HOME
$ mkdir logs
$ cd conf
$ cp -p hive-log4j.properties.template log4j.properties
$ vi log4j.properties
	/* 以下のように記述修正
	#hive.log.dir=${java.io.tmpdir}/${user.name}
	hive.log.dir=$HIVE_HOME/logs
	*/
$ cp -p hive-env.sh.template hive-env.sh
$ chmod 755 hive-env.sh
$ cp -p hive-exec-log4j.properties.template hive-exec-log4j.properties
$ cp -p hive-default.xml.template hive-site.xml

//稼働確認のためHIVEを実行
$ hive
hive> show databases;
	OK
	default
	Time taken: 3.571 seconds, Fetched: 1 row(s)
hive>
//成功!

HIVEもインストールできたところで、HDFS上に仮想データベースを作成してみます。
今回検証のために、過去の東京都内の降水量をオープンデータとして公開しているサイトから、2014/4~2014/9の降水量データを取得してきました。
東京都水防災総合情報システム
http://www.kasen-suibo.metro.tokyo.jp/im/other/tsim0110g.html

00:00,新宿,0
00:10,新宿,1
00:20,新宿,3

の形式で、毎日10分ごとの各計測点の降水量が記載されているCSVファイルです。
ビッグデータと言えるほどの規模とはいきませんが、これを使って仮想データベースを作成してみます。

//DBの作成
hive> CREATE DATABASE mydb;
hive> USE mydb;
hive> CREATE TABLE uryou (time string ,text string,uryou int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
hive> desc uryou;
	desc uryou
	OK
	time                    string
	text                    string
	uryou                   int
	Time taken: 0.239 seconds, Fetched: 3 row(s)

//ローカルのファイルを1点アップロード
hive> LOAD DATA LOCAL INPATH "/home/hdspark/tmp/uryou/Tokyo_Uryo10min_20140401_2.csv" INTO TABLE uryou;
hive> SELECT COUNT(*) FROM uryou;
	・・・
	OK
	null
	Time taken: 52.306 seconds, Fetched: 1 row(s)
hive> SELECT text,sum(uryou) FROM uryou WHERE text LIKE '小%' GROUP BY text order by text;
	・・・
	OK
	小平霊園        0
	小曾木  0
	小河内  0
	小笠原  0
	Time taken: 52.306 seconds, Fetched: 1 row(s)
//普通にSQLみたいに使えます

//続いて、半年のデータをすべてテーブルにinsert
hive> LOAD DATA LOCAL INPATH "/home/hdspark/tmp/uryou/Tokyo_Uryo10min_20140402_2.csv" INTO TABLE uryou;
	・
	・
	・
hive> LOAD DATA LOCAL INPATH "/home/hdspark/tmp/uryou/Tokyo_Uryo10min_20140930_2.csv" INTO TABLE uryou;
	LOAD DATA LOCAL INPATH "/home/hdspark/tmp/uryou/Tokyo_Uryo10min_20140930_2.csv" INTO TABLE uryou
	Copying data from file:/home/hdspark/tmp/uryou/Tokyo_Uryo10min_20140930_2.csv
	Copying file: file:/home/hdspark/tmp/uryou/Tokyo_Uryo10min_20140930_2.csv
	Loading data to table mydb.uryou
	Table mydb.uryou stats: [numFiles=183, numRows=0, totalSize=44604201, rawDataSize=0]
	OK
	Time taken: 0.381 seconds
hive> SELECT COUNT(*) FROM uryou;
	・・・
	OK
	null
	Time taken: 19.417 seconds, Fetched: 1 row(s)

//この半年の、弊社Casleyのある渋谷区周辺の降雨量を時刻別にサマリ
hive> SELECT CONCAT(SUBSTR(time,1,2),':00') AS hour,SUM(uryou)
       > FROM uryou
       > WHERE text LIKE '渋谷%'
       > GROUP BY CONCAT(SUBSTR(time,1,2),':00') ORDER BY hour;
	・・・
	OK
	00:00   34
	01:00   49
	02:00   57
	03:00   37
	04:00   28
	05:00   31
	06:00   30
	07:00   19
	08:00   16
	09:00   13
	10:00   23
	11:00   38
	12:00   25
	13:00   22
	14:00   20
	15:00   36
	16:00   68
	17:00   27
	18:00   46
	19:00   72
	20:00   57
	21:00   29
	22:00   40
	23:00   39
	Time taken: 24.182 seconds, Fetched: 24 row(s)
//成功!

データの読み込みに時間がかかる半面、書き込みは非常に速い印象でした。
14400行×約180ファイル=260万件超でしたが、ファイル分割のオーバーヘッド分を含めても10秒ほどで全てアップロード完了しており、HBaseの書き込み性能の高さの一端かもしれません。

2-3.Sparkのインストール

さて、残るSparkのインストールです。
Standalone modeで稼働させるため今回はビルドは行わないものとします。

//まずはscalaのインストール
$ su
# cd /usr/local/
# wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz
# tar -xzvf scala-2.10.4.tgz
# chown -R hdspark:hdspark scala-2.10.4
# ln -sv  scala-2.10.4 scala

//続いてSparkです
# wget http://archive.apache.org/dist/spark/spark-1.1.0/spark-1.1.0-bin-hadoop2.4.tgz
# tar -xzvf spark-1.1.0-bin-hadoop2.4.tgz
# chown -R hdspark:hdspark spark-1.1.0-bin-hadoop2.4
# ln -sv  spark-1.1.0-bin-hadoop2.4 spark

//操作ユーザにログインしログインシェルに環境変数を追記
# su - hdspark
$ cd
$ vi .bashrc
	/* 以下を追記
	export SCALA_HOME=/usr/local/scala
	export SPARK_HOME=/usr/local/spark
	export PATH=$SCALA_HOME/bin:$PATH
	*/
$ source .bashrc

//稼働確認
$ cd $SPARK_HOME
$ ./bin/spark-shell
	Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
	14/10/01 05:53:08 INFO SecurityManager: Changing view acls to: hdspark,
	14/10/01 05:53:08 INFO SecurityManager: Changing modify acls to: hdspark,
	14/10/01 05:53:08 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hdspark, ); users with modify permissions: Set(hdspark, )
	14/10/01 05:53:08 INFO HttpServer: Starting HTTP Server
	14/10/01 05:53:09 INFO Utils: Successfully started service 'HTTP class server' on port 33066.
	Welcome to
	      ____              __
	     / __/__  ___ _____/ /__
	    _\ \/ _ \/ _ `/ __/  '_/
	   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
	      /_/

	Using Scala version 2.10.4 (OpenJDK Client VM, Java 1.7.0_65)
	Type in expressions to have them evaluated.
	Type :help for more information.
	14/10/01 05:53:22 INFO SecurityManager: Changing view acls to: hdspark,
	14/10/01 05:53:22 INFO SecurityManager: Changing modify acls to: hdspark,
	14/10/01 05:53:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hdspark, ); users with modify permissions: Set(hdspark, )
	14/10/01 05:53:24 INFO Slf4jLogger: Slf4jLogger started
	14/10/01 05:53:24 INFO Remoting: Starting remoting
	14/10/01 05:53:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:36288]
	14/10/01 05:53:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@localhost:36288]
	14/10/01 05:53:25 INFO Utils: Successfully started service 'sparkDriver' on port 36288.
	14/10/01 05:53:25 INFO SparkEnv: Registering MapOutputTracker
	14/10/01 05:53:25 INFO SparkEnv: Registering BlockManagerMaster
	14/10/01 05:53:25 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141001055325-22ac
	14/10/01 05:53:26 INFO Utils: Successfully started service 'Connection manager for block manager' on port 56196.
	14/10/01 05:53:26 INFO ConnectionManager: Bound socket to port 56196 with id = ConnectionManagerId(localhost,56196)
	14/10/01 05:53:26 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
	14/10/01 05:53:26 INFO BlockManagerMaster: Trying to register BlockManager
	14/10/01 05:53:26 INFO BlockManagerMasterActor: Registering block manager localhost:56196 with 267.3 MB RAM
	14/10/01 05:53:26 INFO BlockManagerMaster: Registered BlockManager
	14/10/01 05:53:26 INFO HttpFileServer: HTTP File server directory is /tmp/spark-a33f43d9-37da-4c9e-a0b8-71b117b37012
	14/10/01 05:53:26 INFO HttpServer: Starting HTTP Server
	14/10/01 05:53:26 INFO Utils: Successfully started service 'HTTP file server' on port 54714.
	14/10/01 05:53:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
	14/10/01 05:53:27 INFO SparkUI: Started SparkUI at http://localhost:4040
	14/10/01 05:53:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
	14/10/01 05:53:29 INFO Executor: Using REPL class URI: http://localhost:33066
	14/10/01 05:53:29 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@localhost:36288/user/HeartbeatReceiver
	14/10/01 05:53:30 INFO SparkILoop: Created spark context..
	Spark context available as sc.
scala>

//正常に稼働した模様
//簡単な行数カウントを実行してみます
scala> val txtFile = sc.textFile("README.md")
	14/10/01 05:56:17 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
	14/10/01 05:56:17 INFO MemoryStore: ensureFreeSpace(156973) called with curMem=0, maxMem=280248975
	14/10/01 05:56:17 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 153.3 KB, free 267.1 MB)
	txtFile: org.apache.spark.rdd.RDD[String] = ../README.md MappedRDD[1] at textFile at <console>:12
scala> txtFile.count()
	14/10/01 05:56:29 INFO FileInputFormat: Total input paths to process : 1
	14/10/01 05:56:29 INFO SparkContext: Starting job: count at <console>:15
	14/10/01 05:56:29 INFO DAGScheduler: Got job 0 (count at <console>:15) with 1 output partitions (allowLocal=false)
	14/10/01 05:56:29 INFO DAGScheduler: Final stage: Stage 0(count at <console>:15)
	14/10/01 05:56:29 INFO DAGScheduler: Parents of final stage: List()
	14/10/01 05:56:29 INFO DAGScheduler: Missing parents: List()
	14/10/01 05:56:29 INFO DAGScheduler: Submitting Stage 0 (../README.md MappedRDD[1] at textFile at <console>:12), which has no missing parents
	14/10/01 05:56:29 INFO MemoryStore: ensureFreeSpace(2384) called with curMem=156973, maxMem=280248975
	14/10/01 05:56:29 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.3 KB, free 267.1 MB)
	14/10/01 05:56:29 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (../README.md MappedRDD[1] at textFile at <console>:12)
	14/10/01 05:56:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
	14/10/01 05:56:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1207 bytes)
	14/10/01 05:56:29 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
	14/10/01 05:56:29 INFO HadoopRDD: Input split: file:/usr/local/spark-1.1.0-bin-hadoop2.4/README.md:0+4811
	14/10/01 05:56:29 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
	14/10/01 05:56:29 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
	14/10/01 05:56:29 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
	14/10/01 05:56:29 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
	14/10/01 05:56:29 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
	14/10/01 05:56:30 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1731 bytes result sent to driver
	14/10/01 05:56:30 INFO DAGScheduler: Stage 0 (count at <console>:15) finished in 0.462 s
	14/10/01 05:56:30 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 423 ms on localhost (1/1)
	14/10/01 05:56:30 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
	14/10/01 05:56:30 INFO SparkContext: Job finished: count at <console>:15, took 0.828128221 s
	res0: Long = 141
//成功!

2-4.Hadoopで出力したデータをSparkで分析してみる

それでは、すべての準備が整ったところでHadoopとSparkでいろいろ触ってみることにします。

//SparkがHadoopのHDFSを参照できるかどうかテスト
$ hadoop fs -ls -R /output/
	-rw-r--r--   1 hdspark supergroup          0 2014-09-30 06:27 /output/_SUCCESS
	-rw-r--r--   1 hdspark supergroup         17 2014-09-30 06:27 /output/part-r-00000
$ $SPARK_HOME/bin/spark-shell

//ファイルの中身を参照できるか確認
scala> val txtFile = sc.textFile("hdfs://127.0.0.1:9000/output/part-r-00000")
scala> txtFile.count()
	…
	14/10/01 06:11:17 INFO SparkContext: Job finished: count at <console>:15, took 0.152107234 s
	res2: Long = 4
//成功!

ちゃんとHDFS上のファイルは参照できているようです。これならばHadoopで出力された結果の分析もできるはず!

//Hadoopの出力結果を使ってドリルダウン分析
//2-2で作成したHIVEのuryouテーブルを使用します。
$ hive
hive> SELECT COUNT(*) FROM uryou;
	・・・
	OK
	null
	Time taken: 19.417 seconds, Fetched: 1 row(s)

//テーブルuryouを形成してHDFS上のファイルとして出力してみます
hive> INSERT OVERWRITE DIRECTORY '/output/'
      > SELECT text,time,SUM(uryou) FROM uryou GROUP BY text,time ORDER BY text,time;
	・・・
	Moving data to: /output
	OK
	Time taken: 55.126 seconds
hive> exit;

//出力結果を確認
$ hadoop fs -ls -R /output/
	-rw-r--r--   1 hdspark supergroup     257499 2014-10-01 06:35 /output/000000_0
$ hadoop fs -cat /output/*
	一里塚橋00:004
	一里塚橋00:109
	一里塚橋00:202
	…

おや、区切り文字が出ない…?ローカルに持ってきて見てみると…

$ hadoop fs -get /output/*
$ view 000000_0
 一里塚橋^A00:00^A4
 一里塚橋^A00:10^A9
 一里塚橋^A00:20^A2
 …

調べてみると、どうやらHIVEのファイル出力は区切り文字が固定で「^A」になる模様。しかも変更できない、これは若干不便…。
とりあえず出力はできたので先に進んでみます。

//HDFS上のファイルのリネーム
$ hadoop fs -mv /output/000000_0 /output/hive_output
$ hadoop fs -ls -R /output/
 -rw-r--r-- 1 hdspark supergroup 257499 2014-10-01 06:35 /output/hive_output

//出力ファイル「hive_ourput」を、Sparkで読み込んでみたり、簡単にドリルダウン分析してみる
$ $SPARK_HOME/bin/spark-shell
scala> val hive_ourput = sc.textFile("hdfs://127.0.0.1:9000/output/hive_output")
scala> hive_ourput.count()
 …
 14/10/01 07:07:46 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
 res3: Long = 14976

//★ドリルダウン 渋谷区の降水量を全表示
scala> hive_ourput.filter(line => line.contains("渋谷")).foreach(println)
 …
 渋谷橋00:003
 渋谷橋00:105
 渋谷橋00:209
 渋谷橋00:306
 渋谷橋00:406
 渋谷橋00:505
 …
 渋谷橋23:509
 14/10/01 07:11:38 INFO Executor: Finished task 0.0 in stage 5.0 (TID 5). 1693 bytes result sent to driver
 14/10/01 07:11:38 INFO DAGScheduler: Stage 5 (foreach at <console>:15) finished in 0.118 s
 14/10/01 07:11:38 INFO SparkContext: Job finished: foreach at <console>:15, took 0.131803797 s

//★ドリルダウン 渋谷区の半年間の全降水を合計表示(フィールドは区切り文字「^A = \001」で分割しました)
scala> hive_ourput.filter(line => line.contains("渋谷")).map(line => line.split("\001")).map(parts => (parts(0), parts(2).toInt)).reduceByKey(_ + _,1).foreach(println)
 …
 (渋谷橋,856)
 14/10/01 07:18:36 INFO TaskSetManager: Finished task 0.0 in stage 13.0 (TID 12) in 32 ms on localhost (1/1)
 14/10/01 07:18:36 INFO TaskSchedulerImpl: Removed TaskSet 13.0, whose tasks have all completed, from pool
 14/10/01 07:18:36 INFO DAGScheduler: Stage 13 (foreach at <console>:15) finished in 0.013 s
 14/10/01 07:18:36 INFO SparkContext: Job finished: foreach at <console>:15, took 2.868253413 s
//成功!3秒足らずで完了しています。

Hadoopはその性質上、ジョブ起動にオーバーヘッド分の時間がありちょっとした集計でも時間がかかっていますが、Sparkはこの実装部分でも頑張っているとのことで、ほぼタイムラグ無しでジョブ実行できるそうです。
先ほどの計測地=”*渋谷*”の条件で、hiveでの抽出した時間と比べてみると…

hive> SELECT SUM(uryou) FROM uryou WHERE text LIKE '%渋谷%';
	・・・
	OK
	856
	Time taken: 66.421 seconds, Fetched: 1 row(s)

Spark 3秒 vs HIVE 66秒。およそ22倍の差がありました。
(もっともHIVEのパフォーマンスは、Javaで記述するMapper・Reducerと比べても変換がある分遅いようなので、純粋にHadoopとの比較とはいきませんが。)

3.最後に

HadoopとSparkの連携が簡単にでき、特別な意識なく分散処理が行えることがお分かりいただけましたでしょうか。
オープンソース実装のためかまだまだ不具合やバグがあり、構築するのに苦労しましたが、商用であればAmazonEMRなどのオールインワンのクラウドサービスも存在し、今後ますます需要が広がるのではと思います。
特にSparkではまだまだ日本語の記事も少なくノウハウも無いようですが、今後どんどん進化して使えるようになってくれれば面白くなると感じました。
拙い記事ですが、HadoopとSparkの凄さのほんの一端を、感じていただければ幸いです。