こんにちは!
CSVIT事業部の鈴木 和翔(すずき かずと)です。
私は日々、クライアント先に常駐してAWS関連のシステム開発に携わっています。
今回は、現場で使用しはじめたAWS Glue(以下、Glueとします)というサービスを紹介します。
このGlueは、PythonとScalaの2言語に対応しており、
どちらの方がGlueを扱うにあたって性能有利か気になったため、検証したいと思います。
そもそもGlueとは?
抽出、変換、ロード (ETL) を行う完全マネージド型のサービスで、
お客様の分析用データの準備とロードを簡単にします。
AWSマネジメントコンソールで数回クリックするだけで、ETLJobを作成および実行できます。
Glue では、AWSに保存されたデータを指定するだけでGlue によるデータ検索が行われ、
テーブル定義やスキーマなどの関連するメタデータがDataCatalogに保存されます。
DataCatalogに保存されると、データはすぐに検索かつクエリ可能になり、
ETLに使用できるようになります。
(参考: https://aws.amazon.com/jp/glue/)
簡単にいうとETLのマネージドサービスなのですが、
内部でSparkを使用しており、分散処理をさせることによって大容量のデータでも
かなり早い速度でETLを行う事が出来ます。
そこで、RDSからデータを取得し、
Glueでデータ変換を行い、S3(AWSのクラウド型オブジェクトストレージサービス)に格納するまでの
簡単なアーキテクチャーで、Glueを検証していきたいと思います。
今回のGlue検証
Glueでは、DynamicFrameオブジェクトで操作を行っていきます。
DynamicFrameからデータの書き込みを行う際には、
DataCatalogのデータベース/テーブルを出力先に指定します。
Glueから直接ファイル出力を行う際には、
DynamicFrameオブジェクトからDataFrameオブジェクトに変換し、
DataFrameからS3にファイル出力させる必要があります。
以上を踏まえ、
・DynamicFrame(DataCatalog)のデータ出力
・DynamicFrame ⇒ DataFrameのデータ出力
以上の検証を、それぞれの言語で行っていくこととします。
全4パターンの検証内容となります。
今回、検証していくアーキテクチャーを簡単に図に表してみました。
検証データ作成
まずは、検証に使うテストデータの生成を行います。
今回は、RDSを使用しますので、RDSを作成していきます。
ひとまず、簡単設定でRDSを作成しました。(t2.midium)
パブリックの設定はオフにし、VPC経由で接続していきます。
RDSの作成が完了しましたら、
RDSと同じVPC内にEC2を作成し、
作成したEC2からRDSに接続して、SQL文でデータベースを作成していきます。
(ここでは suzukidbという名前で作成しています。)
[root@ip-xxx-xx-xx-xx ~]# mysql -h suzuki.cluster-************.us-west-2.rds.amazonaws.com -P 3306 -u admin -p Enter password: Welcome to the MariaDB monitor. Commands end with ; or \g. Your MySQL connection id is 101 Server version: 5.6.10 MySQL Community Server (GPL) Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. MySQL [(none)]> create database suzukidb; Query OK, 1 row affected (0.00 sec)
同じVPC内にLambdaを作成し、以下のコードを使用して、DB内に検証用のデータを作成していきます。
nameカラムに、suzuki, matsuoka, shimadaという名前3種類と、
Ageカラムに1~100000までの数値を登録し、全300000件のデータを用意します。
rds-lambda.py
import pymysql def my_handler(event, context): conn = pymysql.connect(host="suzuki.cluster-************.us-west-2.rds.amazonaws.com", user="admin", password="*******", db="suzukidb", connect_timeout=5) with conn.cursor() as cur: ## テーブル作成 cur.execute("create table testtable (Name varchar(255) NOT NULL, Age int NOT NULL, Sex varchar(255) NOT NULL)") sql = "INSERT INTO testtable (name, age, sex) VALUES (%s, %s, %s)" ## テストデータ(300000件挿入) for num in range(100000): cur.execute(sql, ("Suzuki", num, "man")) for num in range(100000): cur.execute(sql, ("Shimada", num, "women")) for num in range(100000): cur.execute(sql, ("Matsuoka", num, "man")) conn.commit()
LambdaからPythonを実行し、ステータスが完了になりましたら
実際にデータを確認していきます。
再度同じEC2からmysqlで接続し、countにてカラムが登録されていることを確認します。
MySQL [suzukidb]> select count(name),count(age),count(sex) from testtable; +-------------+------------+------------+ | count(name) | count(age) | count(sex) | +-------------+------------+------------+ | 100000| 100000| 100000| +-------------+------------+------------+ 1 row in set (0.02 sec)
上記のように表示されたら、検証用データの作成は完了です。
GlueJob作成
早速、GlueJob(以下、Jobとします)の作成に入っていきます。
Glueでは、実際に動作させるJobを作成する前に
RDSとの接続の設定を行うconnectorを作成し、
クローラを使用してGlueデータベースを作成し、
それを基にGlueJobを作成します。
分かりづらいので、一つ一つ操作して行きます。
Connectorの作成
まず、RDSとの接続を行うConnectorを作成していきます。
AWSコンソールから、Glueを選択し、Glueのページに移動します。
左の選択画面から接続を押下し、RDSとの接続設定を行います。
選択すると、以下のような画面が出力されます。
「接続の追加」ボタンを押下します。
接続名をtest-suzuki-db-connectorに設定し、
接続タイプをJDBCに設定します。
最終的な設定内容は以下です。
設定内容に不備がなければ、「完了」ボタンを押下します。
これで、RDSの接続部分の設定が完了しました。
「接続のテスト」を押下することで、接続確認ができます。
クローラの作成
クローラとは、RDB等データソースの接続情報から、
DataCatalog(Glueテーブル)を自動で生成してくれる機能です。
手動でDataCatalogを生成することも出来ますが、
データソース(RDB)が存在する為、ここではクローラを使用します。
コンソール画面左一覧から、クローラを選択し、「クローラの追加」ボタンを押下します。
以下の画面に遷移されますので、クローラの名前を入力して「次へ」ボタンを押下します。
source typeに、Data storesを選択し、「次へ」ボタンを押下します。
データストアにJDBCを指定し、先ほど作成したconnectorを選択した後に「次へ」ボタンを押下します。
別のデータストアの追加はしないので、いいえを選択し「次へ」を押下します。
IAMロールを選択し、「次へ」ボタンを押下します。
(ここでのIAMロールでは、RDSの接続を許可しているロールを選択してください)
詳細画面が表示されますので、問題無ければ「完了」ボタンを押下してください。
成功すると、先ほどRDSで作成したテーブルのDataCatalogが作成されています。
出力側Glueテーブル定義
そうしましたら次は、
出力側(S3)のDataCatalogを作成していきます。
出力側は、元となるデータソースが無いため、
クローラを使用せず、手動でDataCatalogを定義します。
DataCatalog欄のデータベースを選択し、
「データベースの追加」を選択します。
以下のようにデータベースが作成されますので、
suzuki-s3-dbのテーブルを押下します。
まだテーブル情報が追加されていないため、「テーブルの追加」ボタンを押下し、
「手動でのテーブルの追加」を選択します。
テーブル名を入力し、データベースを選択して「次へ」を押下します。
出力先のS3インクルードパスを入力し、「次へ」を押下します。
確認画面へ遷移しますので、
内容が合っているかを確認し、「完了」ボタンを押下してください。
以上で、テーブルの手動追加は完了となりました。
GlueJobの作成
それでは、下準備が完了しましたので、
Jobの定義をします。
左のカテゴリから「ジョブ」を選択し、「ジョブの追加」ボタンを押下します。
ジョブプロパティを、以下のように設定し「次へ」を押下します。
(基本デフォルトのままですが、GlueVersionでは言語PythonのSpark 2.4を選択しています。)
入力データソースとして、先ほどクローラで作成したsuzuki-rdb_dbを選択します。
S3出力用に手動で作成した、suzuki-s3-dbを選択した後、「次へ」を押下します。
変換方式として、
nameカラムに入っている値をsexカラムに挿入し、
sexカラムに入っている値を、nameカラムに挿入するように設定します。
以下の画面に遷移し、検証パターン①のPythonGlueコードが自動で作成されます。
検証パターン① PythonDynamicFrame実装(自動作成)
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "suzuki-rdb-db", table_name = "suzukidb_testtable", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "suzuki-rdb-db", table_name = "suzukidb_testtable", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("name", "string", "sex", "string"), ("age", "int", "age", "int"), ("sex", "string", "name", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("name", "string", "sex", "string"), ("age", "int", "age", "int"), ("sex", "string", "name", "string")], transformation_ctx = "applymapping1") ## @type: SelectFields ## @args: [paths = ["name", "age", "sex"], transformation_ctx = "selectfields2"] ## @return: selectfields2 ## @inputs: [frame = applymapping1] selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["name", "age", "sex"], transformation_ctx = "selectfields2") ## @type: ResolveChoice ## @args: [choice = "MATCH_CATALOG", database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "resolvechoice3"] ## @return: resolvechoice3 ## @inputs: [frame = selectfields2] resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "resolvechoice3") ## @type: DataSink ## @args: [database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "datasink4"] ## @return: datasink4 ## @inputs: [frame = resolvechoice3] datasink4 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice3, database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "datasink4") job.commit()
続いて、同様にGlueにてScalaコードでの自動生成を行います。
先ほどのJob作成と同様の設定とし、
以下の設定項目だけScalaを選択すれば、ScalaGlueコードが自動作成されます。
検証パターン② ScalaDynamicFrame実装
(自動作成)
import com.amazonaws.services.glue.ChoiceOption import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.ResolveSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // @type: DataSource // @args: [database = "suzuki-rdb-db", table_name = "suzukidb_testtable", transformation_ctx = "datasource0"] // @return: datasource0 // @inputs: [] val datasource0 = glueContext.getCatalogSource(database = "suzuki-rdb-db", tableName = "suzukidb_testtable", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame() // @type: ApplyMapping // @args: [mapping = [("name", "string", "sex", "string"), ("age", "int", "age", "int"), ("sex", "string", "name", "string")], transformation_ctx = "applymapping1"] // @return: applymapping1 // @inputs: [frame = datasource0] val applymapping1 = datasource0.applyMapping(mappings = Seq(("name", "string", "sex", "string"), ("age", "int", "age", "int"), ("sex", "string", "name", "string")), caseSensitive = false, transformationContext = "applymapping1") // @type: SelectFields // @args: [paths = ["name", "age", "sex"], transformation_ctx = "selectfields2"] // @return: selectfields2 // @inputs: [frame = applymapping1] val selectfields2 = applymapping1.selectFields(paths = Seq("name", "age", "sex"), transformationContext = "selectfields2") // @type: ResolveChoice // @args: [choice = "MATCH_CATALOG", database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "resolvechoice3"] // @return: resolvechoice3 // @inputs: [frame = selectfields2] val resolvechoice3 = selectfields2.resolveChoice(choiceOption = Some(ChoiceOption("MATCH_CATALOG")), database = Some("suzuki-s3-db"), tableName = Some("testtable"), transformationContext = "resolvechoice3") // @type: DataSink // @args: [database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "datasink4"] // @return: datasink4 // @inputs: [frame = resolvechoice3] val datasink4 = glueContext.getCatalogSink(database = "suzuki-s3-db", tableName = "testtable", redshiftTmpDir = "", transformationContext = "datasink4").writeDynamicFrame(resolvechoice3) Job.commit() } }
以上で、Scala,Python両言語の
DynamicFrame実装が完了となりました。
DynamicFrame性能検証
DynamicFrame実装したJobを、
Pythonの場合とScalaの場合とでそれぞれ実行し、
SparkUIにて性能を5回ずつ計測してみました。
GlueJobの結果をSparkUIで確認する方法は以下をご覧ください
(参考: https://aws.amazon.com/jp/blogs/news/how-to-use-spark-ui/)
5回計測したDuration計測値を表にまとめました。
検証パターン①と検証パターン②を比べてみても、
性能差異はほとんど見られませんでした。
DataFrame実装
先ほど作成したJobスクリプトを編集し、
DynamicFrameのtoDF()メソッドにて
DataFrameに変換し、
withColumnメソッドにてnameカラムとsexカラムを入れ替えた実装とします。
検証パターン③ PythonDataFrame実装
---------------パターン①とここまで同コードの為省略--------------- df = DynamicFrame.toDF(glueContext.create_dynamic_frame .from_catalog("suzuki-rdb-db", "suzukidb_testtable")) df.withColumn('sex1',F.lit(F.col("sex"))) .withColumn("sex",F.lit(F.col("name"))) .withColumn("name",F.lit(F.col("sex1"))) .select("name", "age", "sex") .write.json("s3://" + writeBucket) job.commit()
検証パターン④ ScalaDataFrame実装
---------------パターン②とここまで同コードの為省略--------------- val df = glueContext.getCatalogSource("suzuki-rdb-db","suzukidb_testtable") .getDynamicFrame().toDF() df.withColumn("sex1",lit($"sex")) .withColumn("sex",lit($"name")) .withColumn("name",lit($"sex1")) .select("name", "age", "sex") .write.json("s3://" + writeBucket) Job.commit() } }
以上2言語のJobスクリプトを編集し保存すれば完了です。
DataFrame性能検証
DataFrame実装したJobを、
Pythonの場合とScalaの場合とでそれぞれ実行し、
SparkUIにて性能差を比較してみました。
こちらも5回ずつ計測を行い、
Duration計測値を表にまとめました。
結論
結果的に、DynamicFrameを使用した
検証パターン①と②のDuration値に、殆ど差異はありませんでしたが、
DataFrameを使用した検証パターン③と④のDuration値は
75th percentileの値が、Scalaは0.9sの処理速度だったのに対し、
Pythonでは4sとなったため、多少の劣化が見られた様に思いました。
よって、本検証では多少の差ではありますが、
DynamicFrameでは性能差異は現れなかったものの、
GlueでDataFrameを使用する場合は、Scala有利の結果となりました。
そして多少ではありますが、検証パターン①②と③④を比べると
検証パターン①②のDynamicFrameのみで実装した方が、
全体的に処理速度が速いという結果が分かりました。
今回はここまでの検証となりますが、
DynamicFrameとDataFrameの内部のどこで処理速度に差が出ているのか
機会があれば検証してみたいと思います。
最後に
多少ではあるものの、Scalaの方が性能有利の結果となりました。
Sparkは、基本Scala言語で作成されている為、
個人的には思ったとおりの検証結果となり満足です。
最後までお読みいただき、ありがとうございました!