こんにちは!
CSVIT事業部の鈴木 和翔(すずき かずと)です。

私は日々、クライアント先に常駐してAWS関連のシステム開発に携わっています。
今回は、現場で使用しはじめたAWS Glue(以下、Glueとします)というサービスを紹介します。

このGlueは、PythonとScalaの2言語に対応しており、
どちらの方がGlueを扱うにあたって性能有利か気になったため、検証したいと思います。

そもそもGlueとは?

抽出、変換、ロード (ETL) を行う完全マネージド型のサービスで、
お客様の分析用データの準備とロードを簡単にします。
AWSマネジメントコンソールで数回クリックするだけで、ETLJobを作成および実行できます。
Glue では、AWSに保存されたデータを指定するだけでGlue によるデータ検索が行われ、
テーブル定義やスキーマなどの関連するメタデータがDataCatalogに保存されます。
DataCatalogに保存されると、データはすぐに検索かつクエリ可能になり、
ETLに使用できるようになります。
(参考: https://aws.amazon.com/jp/glue/)

簡単にいうとETLのマネージドサービスなのですが、
内部でSparkを使用しており、分散処理をさせることによって大容量のデータでも
かなり早い速度でETLを行う事が出来ます。

そこで、RDSからデータを取得し、
Glueでデータ変換を行い、S3(AWSのクラウド型オブジェクトストレージサービス)に格納するまでの
簡単なアーキテクチャーで、Glueを検証していきたいと思います。

今回のGlue検証

Glueでは、DynamicFrameオブジェクトで操作を行っていきます。
DynamicFrameからデータの書き込みを行う際には、
DataCatalogのデータベース/テーブルを出力先に指定します。

Glueから直接ファイル出力を行う際には、
DynamicFrameオブジェクトからDataFrameオブジェクトに変換し、
DataFrameからS3にファイル出力させる必要があります。

以上を踏まえ、
 ・DynamicFrame(DataCatalog)のデータ出力
 ・DynamicFrame ⇒ DataFrameのデータ出力
以上の検証を、それぞれの言語で行っていくこととします。

全4パターンの検証内容となります。

今回、検証していくアーキテクチャーを簡単に図に表してみました。

検証データ作成

まずは、検証に使うテストデータの生成を行います。

今回は、RDSを使用しますので、RDSを作成していきます。

ひとまず、簡単設定でRDSを作成しました。(t2.midium)
パブリックの設定はオフにし、VPC経由で接続していきます。

RDSの作成が完了しましたら、
RDSと同じVPC内にEC2を作成し、
作成したEC2からRDSに接続して、SQL文でデータベースを作成していきます。
(ここでは suzukidbという名前で作成しています。)

 
[root@ip-xxx-xx-xx-xx ~]# mysql -h suzuki.cluster-************.us-west-2.rds.amazonaws.com -P 3306 -u admin -p
Enter password:
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MySQL connection id is 101
Server version: 5.6.10 MySQL Community Server (GPL)

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MySQL [(none)]> create database suzukidb;
Query OK, 1 row affected (0.00 sec)
 

同じVPC内にLambdaを作成し、以下のコードを使用して、DB内に検証用のデータを作成していきます。

検証用のデータは、以下のようなものとします。

nameカラムに、suzuki, matsuoka, shimadaという名前3種類と、
Ageカラムに1~100000までの数値を登録し、全300000件のデータを用意します。

rds-lambda.py

import pymysql

def my_handler(event, context):
    conn = pymysql.connect(host="suzuki.cluster-************.us-west-2.rds.amazonaws.com", user="admin", password="*******", db="suzukidb", connect_timeout=5)

    with conn.cursor() as cur:
        ## テーブル作成
        cur.execute("create table testtable (Name varchar(255) NOT NULL, Age int NOT NULL, Sex varchar(255) NOT NULL)")
        sql = "INSERT INTO testtable (name, age, sex) VALUES (%s, %s, %s)"

        ## テストデータ(300000件挿入)
        for num in range(100000):
            cur.execute(sql, ("Suzuki", num, "man"))
        for num in range(100000):
            cur.execute(sql, ("Shimada", num, "women"))
        for num in range(100000):
            cur.execute(sql, ("Matsuoka", num, "man"))
        
        conn.commit()
 

LambdaからPythonを実行し、ステータスが完了になりましたら
実際にデータを確認していきます。

再度同じEC2からmysqlで接続し、countにてカラムが登録されていることを確認します。

MySQL [suzukidb]> select count(name),count(age),count(sex) from testtable;
+-------------+------------+------------+
| count(name) | count(age) | count(sex) |
+-------------+------------+------------+
|       100000|      100000|      100000|
+-------------+------------+------------+
1 row in set (0.02 sec)
 

上記のように表示されたら、検証用データの作成は完了です。

GlueJob作成

早速、GlueJob(以下、Jobとします)の作成に入っていきます。

Glueでは、実際に動作させるJobを作成する前に
RDSとの接続の設定を行うconnectorを作成し、
クローラを使用してGlueデータベースを作成し、
それを基にGlueJobを作成します。

分かりづらいので、一つ一つ操作して行きます。

Connectorの作成

まず、RDSとの接続を行うConnectorを作成していきます。
AWSコンソールから、Glueを選択し、Glueのページに移動します。

左の選択画面から接続を押下し、RDSとの接続設定を行います。
選択すると、以下のような画面が出力されます。

「接続の追加」ボタンを押下します。

接続名をtest-suzuki-db-connectorに設定し、
接続タイプをJDBCに設定します。

先ほど作成したRDSの接続情報を入力します。

最終的な設定内容は以下です。
設定内容に不備がなければ、「完了」ボタンを押下します。

これで、RDSの接続部分の設定が完了しました。
「接続のテスト」を押下することで、接続確認ができます。

クローラの作成

クローラとは、RDB等データソースの接続情報から、
DataCatalog(Glueテーブル)を自動で生成してくれる機能です。

手動でDataCatalogを生成することも出来ますが、
データソース(RDB)が存在する為、ここではクローラを使用します。

コンソール画面左一覧から、クローラを選択し、「クローラの追加」ボタンを押下します。

以下の画面に遷移されますので、クローラの名前を入力して「次へ」ボタンを押下します。

source typeに、Data storesを選択し、「次へ」ボタンを押下します。

データストアにJDBCを指定し、先ほど作成したconnectorを選択した後に「次へ」ボタンを押下します。

別のデータストアの追加はしないので、いいえを選択し「次へ」を押下します。

IAMロールを選択し、「次へ」ボタンを押下します。
(ここでのIAMロールでは、RDSの接続を許可しているロールを選択してください)

オンデマンドで実行を選択し、「次へ」ボタンを押下します。

詳細画面が表示されますので、問題無ければ「完了」ボタンを押下してください。

作成したクローラを起動させます。

成功すると、先ほどRDSで作成したテーブルのDataCatalogが作成されています。


出力側Glueテーブル定義

そうしましたら次は、
出力側(S3)のDataCatalogを作成していきます。

出力側は、元となるデータソースが無いため、
クローラを使用せず、手動でDataCatalogを定義します。

DataCatalog欄のデータベースを選択し、
「データベースの追加」を選択します。

データベース名を記載した後、「作成」ボタンを押下します。

以下のようにデータベースが作成されますので、
suzuki-s3-dbのテーブルを押下します。

まだテーブル情報が追加されていないため、「テーブルの追加」ボタンを押下し、
「手動でのテーブルの追加」を選択します。

テーブル名を入力し、データベースを選択して「次へ」を押下します。

出力先のS3インクルードパスを入力し、「次へ」を押下します。

出力タイプを指定し、「次へ」を押下します。

スキーマの定義を入力し、「次へ」を押下します。

確認画面へ遷移しますので、
内容が合っているかを確認し、「完了」ボタンを押下してください。

以上で、テーブルの手動追加は完了となりました。

GlueJobの作成

それでは、下準備が完了しましたので、
Jobの定義をします。

左のカテゴリから「ジョブ」を選択し、「ジョブの追加」ボタンを押下します。

ジョブプロパティを、以下のように設定し「次へ」を押下します。
(基本デフォルトのままですが、GlueVersionでは言語PythonのSpark 2.4を選択しています。)

入力データソースとして、先ほどクローラで作成したsuzuki-rdb_dbを選択します。

「スキーマを変更する」を選択し、「次へ」を押下します。

S3出力用に手動で作成した、suzuki-s3-dbを選択した後、「次へ」を押下します。

変換方式として、
nameカラムに入っている値をsexカラムに挿入し、
sexカラムに入っている値を、nameカラムに挿入するように設定します。

以下の画面に遷移し、検証パターン①のPythonGlueコードが自動で作成されます。

検証パターン① PythonDynamicFrame実装(自動作成)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "suzuki-rdb-db", table_name = "suzukidb_testtable", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "suzuki-rdb-db", table_name = "suzukidb_testtable", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("name", "string", "sex", "string"), ("age", "int", "age", "int"), ("sex", "string", "name", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("name", "string", "sex", "string"), ("age", "int", "age", "int"), ("sex", "string", "name", "string")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["name", "age", "sex"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["name", "age", "sex"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "resolvechoice3")
## @type: DataSink
## @args: [database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = resolvechoice3]
datasink4 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice3, database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "datasink4")
job.commit()
 

続いて、同様にGlueにてScalaコードでの自動生成を行います。

先ほどのJob作成と同様の設定とし、
以下の設定項目だけScalaを選択すれば、ScalaGlueコードが自動作成されます。

検証パターン② ScalaDynamicFrame実装

(自動作成)

import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    // @type: DataSource
    // @args: [database = "suzuki-rdb-db", table_name = "suzukidb_testtable", transformation_ctx = "datasource0"]
    // @return: datasource0
    // @inputs: []
    val datasource0 = glueContext.getCatalogSource(database = "suzuki-rdb-db", tableName = "suzukidb_testtable", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
    // @type: ApplyMapping
    // @args: [mapping = [("name", "string", "sex", "string"), ("age", "int", "age", "int"), ("sex", "string", "name", "string")], transformation_ctx = "applymapping1"]
    // @return: applymapping1
    // @inputs: [frame = datasource0]
    val applymapping1 = datasource0.applyMapping(mappings = Seq(("name", "string", "sex", "string"), ("age", "int", "age", "int"), ("sex", "string", "name", "string")), caseSensitive = false, transformationContext = "applymapping1")
    // @type: SelectFields
    // @args: [paths = ["name", "age", "sex"], transformation_ctx = "selectfields2"]
    // @return: selectfields2
    // @inputs: [frame = applymapping1]
    val selectfields2 = applymapping1.selectFields(paths = Seq("name", "age", "sex"), transformationContext = "selectfields2")
    // @type: ResolveChoice
    // @args: [choice = "MATCH_CATALOG", database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "resolvechoice3"]
    // @return: resolvechoice3
    // @inputs: [frame = selectfields2]
    val resolvechoice3 = selectfields2.resolveChoice(choiceOption = Some(ChoiceOption("MATCH_CATALOG")), database = Some("suzuki-s3-db"), tableName = Some("testtable"), transformationContext = "resolvechoice3")
    // @type: DataSink
    // @args: [database = "suzuki-s3-db", table_name = "testtable", transformation_ctx = "datasink4"]
    // @return: datasink4
    // @inputs: [frame = resolvechoice3]
    val datasink4 = glueContext.getCatalogSink(database = "suzuki-s3-db", tableName = "testtable", redshiftTmpDir = "", transformationContext = "datasink4").writeDynamicFrame(resolvechoice3)
    Job.commit()
  }
}
 

以上で、Scala,Python両言語の
DynamicFrame実装が完了となりました。

DynamicFrame性能検証

DynamicFrame実装したJobを、
Pythonの場合とScalaの場合とでそれぞれ実行し、
SparkUIにて性能を5回ずつ計測してみました。

GlueJobの結果をSparkUIで確認する方法は以下をご覧ください
(参考: https://aws.amazon.com/jp/blogs/news/how-to-use-spark-ui/)

検証パターン① Python タスク毎のメトリクスのサマリ

検証パターン② Scala タスク毎のメトリクスのサマリ

5回計測したDuration計測値を表にまとめました。

検証パターン①(Python)

検証パターン②(Scala)

それぞれの平均値をまとめると以下となりました。

検証パターン①と検証パターン②を比べてみても、
性能差異はほとんど見られませんでした。

DataFrame実装

先ほど作成したJobスクリプトを編集し、
DynamicFrameのtoDF()メソッドにて
DataFrameに変換し、
withColumnメソッドにてnameカラムとsexカラムを入れ替えた実装とします。

検証パターン③ PythonDataFrame実装

 
---------------パターン①とここまで同コードの為省略---------------
 
    df = DynamicFrame.toDF(glueContext.create_dynamic_frame
         .from_catalog("suzuki-rdb-db", "suzukidb_testtable"))

    df.withColumn('sex1',F.lit(F.col("sex")))
      .withColumn("sex",F.lit(F.col("name")))
      .withColumn("name",F.lit(F.col("sex1")))
      .select("name", "age", "sex")
      .write.json("s3://" + writeBucket)

    job.commit()
 

検証パターン④ ScalaDataFrame実装

 
---------------パターン②とここまで同コードの為省略---------------
 
    val df = glueContext.getCatalogSource("suzuki-rdb-db","suzukidb_testtable")
             .getDynamicFrame().toDF()

    df.withColumn("sex1",lit($"sex"))
      .withColumn("sex",lit($"name"))
      .withColumn("name",lit($"sex1"))
      .select("name", "age", "sex")
      .write.json("s3://" + writeBucket)

    Job.commit()
  }
}
 

以上2言語のJobスクリプトを編集し保存すれば完了です。

DataFrame性能検証

DataFrame実装したJobを、
Pythonの場合とScalaの場合とでそれぞれ実行し、
SparkUIにて性能差を比較してみました。

検証パターン③ Python タスク毎のメトリクスのサマリ

検証パターン④ Scala タスク毎のメトリクスのサマリ

こちらも5回ずつ計測を行い、
Duration計測値を表にまとめました。

検証パターン③(Python)

検証パターン④(Scala)

それぞれの平均値をまとめると以下となりました。

結論

結果的に、DynamicFrameを使用した
検証パターン①と②のDuration値に、殆ど差異はありませんでしたが、
DataFrameを使用した検証パターン③と④のDuration値は
75th percentileの値が、Scalaは0.9sの処理速度だったのに対し、
Pythonでは4s
となったため、多少の劣化が見られた様に思いました。

よって、本検証では多少の差ではありますが、
DynamicFrameでは性能差異は現れなかったものの、
GlueでDataFrameを使用する場合は、Scala有利の結果となりました

そして多少ではありますが、検証パターン①②と③④を比べると
検証パターン①②のDynamicFrameのみで実装した方が、
全体的に処理速度が速い
という結果が分かりました。

今回はここまでの検証となりますが、
DynamicFrameとDataFrameの内部のどこで処理速度に差が出ているのか
機会があれば検証してみたいと思います。

最後に

多少ではあるものの、Scalaの方が性能有利の結果となりました。

Sparkは、基本Scala言語で作成されている為、
個人的には思ったとおりの検証結果となり満足です。

最後までお読みいただき、ありがとうございました!

鈴木 和翔
CSVIT事業部 IT部 鈴木 和翔