こんにちは。SI部の杉光です。

今回はAmazon Web Serviceの一つであるAmazon Elastic MapReduce(以下EMRと省略)を利用して
簡単に大規模データの分散処理を行う方法とEMRでサポートされているHadoopエコシステムの利用例をご紹介したいと思います。

Amazon EMRとは

EMRはAWSが提供するHadoopクラスターのサービスです。

クラウドサービスなので、わずか数分で仮想サーバーのクラスターを立ち上げ可能にし、
計算能力の需要に合わせクラスターを構成するサーバー数を調整することが可能です。
時間単位の課金なので、大量のデータを直ぐに短期間で処理したい場合も高いコストパフォーマンスで対応できます。

また、他AWSサービスとの連携が可能で、S3、RDSやDynamoDBに保存されたデータにクラスターからアクセスが可能です。
とりわけ、S3はHDFSと分けて意識することなくデータを保存することができます。

それでは、AWS Mamagement Consoleを使用してEMRの起動から
いくつかのHadoopエコシステム(アプリケーション)の利用方法をご説明したいと思います。
使用するアプリケーションは以下の通りです。
・Hive
・Spark(最近サポート対象となりました。)
・Hue

AWS ConsoleからEMR起動

AWS Management ConsoleからElastic MapReduceを選択します。

EMR

EMRの作成画面に移動したら、クイック設定から詳細クラスターの設定画面に移動します。

続いてHadoopディストリビューションの選択とインストールアプリケーションを追加します。
今回はAmazon「AMI version 3.8.0」を使用します。インストールするアプリケーションは、
AMIバージョンによって各々のアプリケーションバージョンも決まります。
今回の例では「Hadoop 2.4.0」、「Hive 0.13.1」、「Spark 1.3.1」、「Hue 3.7.1」をイントールします。

2.ソフトウェア設定

続いてハードウエアの構成です。起動するネットワーク(ネットワーク)と各ノードのインスタンスタイプと数を指定します。
スポットインスタンスも対応しており、通常のオンデマンドインスタンスより低価格で利用することが可能です。
スポットインスタンスは価格が変動するので、短期間に大量のインスタンスを稼動させたい場合にお薦めと言えます。
最低マスターノード1台、コアノード1台の構成からクラスターを起動可能ですが、今回はHueをインストールする都合上、コアノードを2台起動します。

3.ハードウェア構成

EMRでは、ブートストラップアクションとステップの追加を行うことができます。
今回は追加設定を行いませんが、簡単に触れておきます。
ブートストラップアクションは主にHadoopパラメータのカスタマイズや追加のアプリケーションを
インストールする場合に使用します。
ステップの追加はMapReduceジョブや各アプリケーションのプログラム起動支持ができます。
また、ステップ完了後にクラスターを自動終了する設定が可能です。

4.ブートストラップアクション

「セキュリティとアクセス」、「タグ」やクラスター名の設定など特に触れませんが適所設定してください。
今回はクラスター起動後にマスターノードにsshでアクセスするので「EC2キーペア」を設定します。

一通り設定が完了したらクラスターを起動します。

5.クラスターステータス

クラスターのステータスが実行中(Wating)状態になったら利用可能となります。 早速マスターノードへ接続します。

 >ssh hadoop@hostname -i emr-key.pem

Hiveを使用して分散処理を実行

MapReduceをSQLライクに実行できるHiveを使用します。
コマンドはmysqlクライアントを使う感覚で利用できます。

今回は以下のステップでAmazon S3のデータの読み込み/書き込みを行っていきます。

① S3のURLに対してテーブルを作成(読み込み用と書き込み用テーブル2つ)
② ①で作成したテーブルに対してデータの検索を行う。
③ 読み込み用のテーブルから検索・集計した結果をテーブル書き込みを行う。

S3バケットは「emr-testdata」を使用します。
s3://emr-testdata/tsv/input/ … 読み込み用テーブル
s3://emr-testdata/tsv/output/ … 書き込み用テーブル
s3構造

EMRを起動するIAMは上記バケットの読み取り/書き込み権限が付与されています。
尚、データは横浜市の天気情報(1985年〜2015年)を使用しています。
http://www.data.jma.go.jp/gmd/risk/obsdl/


-- マスターノードからhiveの起動
[hadoop@ip-172-31-24-87 ~]$ hive

Logging initialized using configuration in jar:file:/home/hadoop/.versions/hive-0.13.1-amzn-3/lib/hive-common-0.13.1-amzn-3.jar!/hive-log4j.properties

-- DB作成
hive> create database testdb;
OK
Time taken: 6.008 seconds

-- DB切り替え
hive> use testdb;
OK
Time taken: 0.092 seconds

-- 読み込み用のテーブルを作成
hive> create external table weather(
 > date string,
 > Temperature float,
 > Precipitation float,
 > Summary string
 > )
 > PARTITIONED BY(Age string)
 > ROW FORMAT DELIMITED
 > FIELDS TERMINATED BY '\t'
 > LINES TERMINATED BY '\n'
 > LOCATION 's3://emr-testdata/tsv/input/';
OK
Time taken: 12.503 seconds
hive>

-- 年代毎に分かれているフォルダに対してPartitionを作成
 > ALTER TABLE weather ADD PARTITION (Age='1980')
 > LOCATION 's3://emr-testdata/tsv/input/1980/';
OK
Time taken: 1.101 seconds
hive>
 > ALTER TABLE weather ADD PARTITION (Age='1990')
 > LOCATION 's3://emr-testdata/tsv/input/1990/';
OK
Time taken: 0.665 seconds
hive>
 > ALTER TABLE weather ADD PARTITION (Age='2000')
 > LOCATION 's3://emr-testdata/tsv/input/2000/';
OK
Time taken: 0.529 seconds
hive>
 > ALTER TABLE weather ADD PARTITION (Age='2010')
 > LOCATION 's3://emr-testdata/tsv/input/2010/';
OK
Time taken: 0.546 seconds

-- Partition毎の件数確認
hive> select age,count(*) from weather group by age;
MapReduce Jobs Launched:
Job 0: Map: 1 Reduce: 1 Cumulative CPU: 8.39 sec HDFS Read: 405 HDFS Write: 40 SUCCESS
Total MapReduce CPU Time Spent: 8 seconds 390 msec
OK
1980 3653
1990 3652
2000 3653
2010 2053
Time taken: 52.417 seconds, Fetched: 4 row(s)

-- サンプルでデータ内容を確認
hive> select * from weather limit 5;
OK
1980/1/1 6.7 0.0 曇 1980
1980/1/2 4.6 2.5 晴 1980
1980/1/3 6.4 16.5 曇時々雨 1980
1980/1/4 9.8 13.5 曇一時雨 1980
1980/1/5 4.8 2.0 曇時々雨 1980
Time taken: 0.224 seconds, Fetched: 5 row(s)</pre>

-- 書き込み用のテーブルを作成
hive> create external table aggregate(
 > date string,
 > Summary string,
 > Count int
 > )
 > ROW FORMAT DELIMITED
 > FIELDS TERMINATED BY '\t'
 > LINES TERMINATED BY '\n'
 > LOCATION 's3://emr-testdata/tsv/output/';
OK
Time taken: 0.311 seconds

-- 年別に「晴」の文字が含まれる日を集計
hive> insert into table aggregate select substr(date,0,4),"晴",count(*) from weather where Summary like "%晴%" group by substr(date,0,4);
Loading data to table testdb.aggregate
Table testdb.aggregate stats: [numFiles=0, numRows=36, totalSize=0, rawDataSize=432]
OK
Time taken: 60.287 seconds

-- 年別に「雨」の文字が含まれる日を集計
hive> insert into table aggregate select substr(date,0,4),"雨",count(*) from weather where Summary like "%雨%" group by substr(date,0,4);
Loading data to table testdb.aggregate
Table testdb.aggregate stats: [numFiles=0, numRows=36, totalSize=0, rawDataSize=419]
OK
Time taken: 60.547 seconds

-- 年別に「雪」の文字が含まれる日を集計
hive> insert into table aggregate select substr(date,0,4),"雪",count(*) from weather where Summary like "%雪%" group by substr(date,0,4);
Loading data to table testdb.aggregate
Table testdb.aggregate stats: [numFiles=0, numRows=34, totalSize=0, rawDataSize=341]
OK
Time taken: 59.515 seconds

-- 書き込み用テーブルの内容を確認
hive> select * from aggregate order by date;
OK
1980 雪 5
1980 晴 177
1980 雨 101
〜 省略 〜
2015 雨 72
2015 雪 5
2015 晴 141
Time taken: 48.088 seconds, Fetched: 106 row(s)

-- 年別に平均気温と降水量を集計
hive> select substr(date,0,4),round(avg(Temperature),1),round(avg(Precipitation),1) from weather group by substr(date,0,4);
MapReduce Jobs Launched:
Job 0: Map: 1 Reduce: 1 Cumulative CPU: 10.78 sec HDFS Read: 405 HDFS Write: 504 SUCCESS
Total MapReduce CPU Time Spent: 10 seconds 780 msec
OK
1980 15.0 5.0
1981 15.0 4.3
1982 15.3 4.9
1983 14.9 4.8
〜 省略 〜
2012 15.9 5.5
2013 16.7 4.2
2014 16.2 5.1
2015 16.2 4.1
Time taken: 57.248 seconds, Fetched: 36 row(s)

S3のデータを参照して集計した結果を別のS3フォルダに書き込むことができました。
(フォルダ内容は後述のHueで確認します。)
また、HiveではPartitionを指定することにより、検索対象のファイルを限定することができ、
検索効率が良くなります。同時にデータが大量になるほどパフォーマンスに影響をもたらすので必須の設定内容とも言えます。
実際の運用では日付毎にフォルダを分けて、Partitionを指定する例が多いようです。

Sparkの実行

続いて、Sparkで処理を行いたいと思います。
Hiveで作成したテーブル定義を利用したいのでSparkSQLを起動します。
検索クエリもHiveと同じものを実行します。


[hadoop@ip-172-31-24-87 ~]$ spark-sql
spark-sql>
-- testdbへ切り替え
spark-sql> use testdb;

OK
-- table一覧を確認
spark-sql> show tables;
aggregate false
weather false
Time taken: 0.479 seconds, Fetched 2 row(s)
-- Hiveと同じように年別に「晴」の文字が含まれる日を集計
spark-sql> select substr(date,0,4),"晴",count(*) from weather where Summary like "%晴%" group by substr(date,0,4);

15/08/16 13:29:21 INFO scheduler.DAGScheduler: Stage 6 (collect at SparkPlan.scala:84) finished in 6.545 s
15/08/16 13:29:21 INFO scheduler.DAGScheduler: Job 4 finished: collect at SparkPlan.scala:84, took 13.574099 s
2010 晴 245
2011 晴 249
2012 晴 237
2013 晴 256
〜 省略 〜
2006 晴 207
2007 晴 247
2008 晴 244
2009 晴 218

15/08/16 13:29:21 INFO scheduler.StatsReportListener: 45.0 ms 48.0 ms 50.0 ms 55.0 ms 60.0 ms 75.0 ms 94.0 ms 99.0 ms 145.0 ms
Time taken: 15.907 seconds, Fetched 36 row(s)
15/08/16 13:29:21 INFO CliDriver: Time taken: 15.907 seconds, Fetched 36 row(s)

spark-sql> select substr(date,0,4),round(avg(Temperature),1),round(avg(Precipitation),1) from weather group by substr(date,0,4);

15/08/16 13:30:15 INFO scheduler.DAGScheduler: Stage 8 (collect at SparkPlan.scala:84) finished in 12.058 s
15/08/16 13:30:15 INFO scheduler.DAGScheduler: Job 5 finished: collect at SparkPlan.scala:84, took 19.817250 s

2010 16.6 5.1
2011 16.4 4.3
2012 15.9 5.5
2013 16.7 4.2
〜 省略 〜
2006 16.0 5.1
2007 16.6 4.0
2008 16.1 5.2
2009 16.3 5.2
Time taken: 20.693 seconds, Fetched 36 row(s)
15/08/16 13:30:15 INFO CliDriver: Time taken: 20.693 seconds, Fetched 36 row(s)
spark-sql> select * from aggregate order by date;

1980 雨 101
1980 晴 177
1981 雪 3
1981 雨 93
〜 省略 〜
2014 雪 7
2015 雪 5
2015 雨 72
2015 晴 141
Time taken: 6.224 seconds, Fetched 106 row(s)
15/08/16 13:30:58 INFO CliDriver: Time taken: 6.224 seconds, Fetched 106 row(s)

Hive同様のクエリが実行できましたね。
Sparkはインメモリで処理できる分早かったですが、データが巨大な場合はまた違った結果となるかもしれません。

Hueを使用してWeb画面からクエリを実行

HueとはHadoop用のWebGUIです。EMR上で稼動するHueは以下の機能があります。

・S3とHDFSのファイルブラウザ。データ参照や移動が行える。
・Hive/Pigのエディタとインタラクティブ実行
・Metastore Manager …Hiveメタストアの表示と操作
・Job Browser … Hadoop  Jobのステータスを管理

ビッグデータをより多くの人にというコンセプトで開発されてり、プログラム知識の無い人でも使い易いツールとなっています。

Hueへの接続方法ですがポートフォワーディング使用してSSHトンネルを作成し、
ブラウザのプロキシ管理アドオン設定する必要があります。
設定方法はここでは割愛しますが、詳細はAmazonドキュメントに記載があります。
https://docs.aws.amazon.com/ja_jp/ElasticMapReduce/latest/DeveloperGuide/accessing-hue.html

HueのURLアクセスします。初回はユーザーの作成を行うことになります。

8.Hueログイン画面

File Browserでs3やHDFSでマウントした内容を確認できます。
以下はHiveで集計した結果のファイルを参照しています。

9.Hueファイルブラウザ

Hive Editorを使って先ほどのterminalで実行した時と同じようにクエリを実行してみます。
実行結果のほか、クエリの実行履歴を参照することもできます。

9.HueHiveEditorクエリ実行結果

Chartから簡単なグラフも見ることができます。

10.HueHiveEditorグラフ

最後に

今回はEMRを使用することで面倒なHadoop構築や各アプリケーションのインストールがボタンひとつで可能になり、そしてSQLライクなクエリーを使うことで分散処理を簡単に実行できることがお分かり頂けたと思います。

さらにEMRの利点として、必要な時に必要な分だけインスタンスを稼動可能なので非常にスケールメリットがあります。(これはクラウドの謡い文句になってしまいますが…)

また、Hadoopを試しに使ってみる環境として、Cloudera社が提供しているCDHが仮想マシンとして利用できるのでお薦めです。http://www.cloudera.com/content/cloudera/en/downloads.html

EMRよりも多くのアプリケーションがインストールされており、かつVersionも新しいです。とりあえず、新しいものを触ってみたいという方はこちが良いかと思います。

分散処理基盤を選択する場合の参考にして頂ければと思います。