こんにちは。 キャスレーコンサルティングのLS(リーディング・サービス)部 杉光です。
今回は、Amazon Web ServicesのAWS Glueを使って、
GUIから簡単にETL(Extract、Transform、Load)を行いたいと思います。
AWS Glueの機能概要と特徴
AWS Glueとは
データ抽出、変換、ロード(ETL)とデータカタログ管理を行う、完全マネージド型サービスです。
主に以下の機能があります。
Spark Jobの作成、実行と管理
- Python、ScalaのSpark scriptをJob登録して、実行、及び実行の管理。 実行はトリガー、オンデマンド、スケジュールを指定でき、 実行されたJobのモニタリングや、アラートなども行うことができます。
- Glue上でScriptの生成、編集が行えます。
データの検索とカタログ情報の管理
- AWSサービス上に保存されたデータの検索とカタログ情報の登録。 データの保存場所を指定するだけで、Glueでデータ検索を行い、 ファイルの識別と解析、スキーマ定義や関連するメタデータ(※)をデータカタログに保存します。 ※[扱えるカタログ情報]…Aurora、RDS、Redshift、S3、Athena、EMR、Redshift Spectrum、Hive Metastore
- スキーマ変更のバージョン管理が行えます。
- データ検索のスケジュール化、トリガー実行が行えます。
AWSサービスにおけるGlueの役割
AWSサービスではいわゆるビッグデータにおける、収集、ETL、分析などを行うことができるサービスが様々ありますが、 Glueの役割は、データ収集後からデータをデータストアにロードする前に行う、データ変換とカタログ情報の管理です。
データ変換とは、例えばファイルフォーマットの変換、ノイズデータのクレンジング、不要項目の除外などがあります。 データ内容やフォーマットによって、処理stepや方法は変わってきますが、分析官が分析しやすくするデータ、 又は、大量データの集計・統計を行うためにパフォーマンスを発揮できるデータを生成することが目的になります。
それでは、Glueでどんなことができるのかを実際に試してお見せしたいと思います。
検証
まず、S3上にあるTSVデータを検索し、Glueにカタログ情報に登録します。 そして、そのカタログ情報から簡単な処理(項目を絞り、フォーマット変換)を行うところまで、実践したいと思います。
①データ検索及びカタログ情報登録
GlueのClawer(クローラー)という機能を使います。 クローラーは、S3オブジェクト、JDBC接続先のDB(RedshiftやRDS)に対して 自動にスキーマ情報(項目、データ型、ファイルタイプ、パーティション)を推測して、カタログ情報を登録してくれます。
それでは、S3にあるデータに対してクローラーでテーブルを作成します。
準備
テストデータには、気象庁の過去の天気データを用意します。 S3には、以下のように都市別/年代別に配置しています。
データ項目
- 日付 – yyyy/mm/dd
- 日中気温 — 例)26.3
- 付加情報
- 付加情報
- 降水量 — 例)5.0
- 付加情報
- 付加情報
- 付加情報
- 天気概状 — 例)曇後雨
- 付加情報
- 付加情報
※付加情報は、ダウンロードしたデータの状態を表す品質情報などで、本検証では利用しない項目です。
①-1 クローラーの登録
GlueのコンソールからCrawlersを選択し、「Add Crawler」を押します。
クローラー名を入力します。
今回は、S3のデータに対してスキーマ定義を行うので、Data Storeは「S3」を選択します。
Include Pathに、対象データのS3 Locationを指定します。
複数の対象データを指定することができますが、今回は一つだけにしますので、「No」を選択します。
クローラーで使用する、IAM Roleを選択します。
画面に記載の通り、使用するRoleにはManaged policyである「AWSGlueServiceRole」と参照する
S3に対してのアクセス権限を付与する必要があり、その権限をもつRoleを作成しました。
スケジューラを選択します。今回は、その場で実行するので「Run On demand」にします。
スキーマ情報の登録先のDB名と、テーブル名を入力します。
最後に、設定内容のプレビュー画面になり、「Finish」を押せば登録完了です。
①-2 クローラーの実行
一覧に先程登録したクローラーが表示されるので、対象のクローラーを選択し「Run crawler」を押して実行します。
実行後は、下画面のようにステータス、実行Log、テーブルの追加/更新結果などが確認できます。
(実行結果によって、1テーブル追加されていることがわかります。)
①-3 カタログ情報の確認
クローラーの実行結果、Data Catalogの画面からは以下のようにテーブルが作成されていることが確認できます。
スキーマ情報
データ型も推測され、パーティション(city,age)も登録されていることがわかります。
これらの項目は、この画面から編集を行うこともできます。
②変換Jobの作成と実行
スキーマ情報が登録ができたので、このデータの不要項目を削除し、ファイルフォーマットを変換するJobを作成します。
今回はTSV形式のファイルから、Parquet形式に変換したいと思います。
②-1 Spark Jobの作成
Spark Jobの作成を、Glueのコンソールから生成したいと思います。
Jobsのメニューから「Add Job」を押します。
Job名の入力、Roleの選択、Script言語の選択、Script名の入力等を行います。
今回Script言語は「Python」を選択します。
Roleはクローラーと同じものを選択しました。
(ETL後のアウトプット先や、Script配置先を考慮して、権限を設定して下さい。)
また、「This run jobs」の選択ではScriptをGlueによって、
自動生成してくれる項目(A proposed script generated by AWS Glue)を選択します。
インプットとなるデータソース指定します。
先程、クローラーで登録したテーブルを選択します。
アウトプット先を指定します。
インプットと同じS3バケットに「parquet」というフォルダを作成し、そこに出力するようにします。
インプットとアウトプットデータの項目をマップする、選択画面です。
アウトプットのデータは、不要な項目を省きます。
(インプットデータの項目名は、カタログ情報画面で予め編集しています。)
残念なことに、マップ対象の項目にパーティションが出てきません…後程scriptを修正することで対応します。
項目のマップ選択まで終えると、Scriptのエディタ画面に遷移します。
直ぐにJobを実行することもできますが、 パーティション項目もインプットデータと同じように出力したいので、Scriptを少し編集します。
変換するデータの項目マッピングを、定義します。
# 修正前 applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "string", "date", "string"), ("temperature", "double", "temperature", "double"), ("precipitation", "double", "precipitation", "double"), ("summary", "string", "summary", "string")], transformation_ctx = "applymapping1") # 修正後 # マッピング項目にPartitionの'city'と'age'を追加 applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "string", "date", "string"), ("temperature", "double", "temperature", "double"), ("precipitation", "double", "precipitation", "double"), ("summary", "string", "summary", "string"), ("city", "string", "city", "string"), ("age", "string", "age", "string")], transformation_ctx = "applymapping1")
ここでは、データをS3に書き込む処理になりますが、 Glue独自で定義しているDynamicFrameでは、
Partitionの出力が出来ないため、 SparkのDataFrameに変換してからS3に書き込みます。
# コメント行 もともと42行目にあった部分をコメント化 datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://glue-sandbox-testdata/parquet"}, format = "parquet", transformation_ctx = "datasink4") # 追加行 df = dropnullfields3.toDF() #DynamicFrameからDataFrameへ変換 df.write.partitionBy("city","age").mode("overwrite").parquet("s3://glue-sandbox-testdata/parquet",compression="snappy")
編集が完了したら、「Save」を押します。
②-2Jobの実行
登録したJobの実行を行います。
トリガー設定やスケジューリングが行えますが、今回はオンデマンドで実行します。
一覧から対象のJobを選択し「Action」→「Run Job」で実行できます。
実行中・実行後のJobステータスを下段で確認することができます。(下画面では3回Job実行後の結果となります。)
Job実行後のS3バケットの内容です。
インプットデータと同じパーティション構成で、Parquetファイルで出力されています。
③Athenaでデータ参照
Glueでカタログ登録したテーブルを、AWS Athenaで参照したい思います。
Athenaについては、以前ブログでご紹介していますので、そちらを参照頂ければと思います。
⇒AWS Athenaで集計して性能をみる
以下はAthnaのコンソール画面になります。
glue-sandbox DBにGlueで作成したテーブルがあることが確認できます。
2つのテーブルはファイルフォーマットが異なるだけなので、クエリで同じ結果が帰ってくることが確認できます。
変換前のtsvデータに対してのSELECT結果 (結果は下段のResults)
変換後のparquetデータに対してのSELECT結果
さいごに
AWS Glueを使って、データの検索、カタログ情報の登録、データ変換処理を簡単に行うことができました。
これらのことを全てマネージドで行ってくれることで導入が容易になりますし、
サーバー管理要らずで運用コストも、かなり省けるのではないかと思います。
個人的にはクローラーとJob実行をトリガー、スケジュールで行える機能は嬉しいです。
また、カタログ情報がGlueに統合されることで、
AWSにおけるデータ分析関連サービスの中心はGlueになっていくように感じます。
大規模データを扱うときのパフォーマンスが気になりますが、実運用に向けて検討していきたいと思います。
最後までお読みいただき、ありがとうございました。
参考文献
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/what-is-glue.html https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html