はじめに

こんにちは。

キャスレーコンサルティング IT(インテグレーションテック)部の平井です。

今回はGoogle Cloud Platform(以下GCP)で、
サーバレスにバッチ処理を行うアーキテクチャを組んでみようと思います。

1日1回しか動かない処理のために、インスタンスを立ち上げ続けたりしていると、
インスタンスのコストや監視コストが余計にかかってしまいます。

そこでGCPのサービスをフル活用してラクをしよう!
というのが本稿の趣旨になります。

目次

1. 事前準備
2. 使用するCGPのサービス
 - 2.1 Cloud Scheduler
 - 2.2 Cloud Functions
 - 2.3 Cloud Dataflow
 - 2.3.1 Cloud Dataflow Template
 - 2.4 備考
3. 作成するアーキテクチャの概要
4. Cloud Functionsで実行するコードを用意しよう
5. Cloud Schedulerを設定しよう
6. 実際に動かしてみよう
7. 終わりに

1. 事前準備

GCPを使用する準備をしましょう。

GCPではアカウントを作成すると、12ヶ月使用できる無料クレジットが300ドル分付与されます。
(※2019年4月時点)

このクレジットを利用して、GCPのサービスを試すことが出来ます。

本稿では、アカウントの作成方法などは触れません。
詳しくはこちらのドキュメントをご参照ください。

2. 使用するGCPのサービス

2.1 Cloud Scheduler

公式ドキュメントの紹介によると、Cloud Scheduler は
エンタープライズ クラスのフルマネージド cron ジョブ スケジューラ
となっています。

crontab形式でジョブの実行時間を設定でき、
App Engine・Cloud Pub/Sub・任意のHTTPエンドポイントを対象に、ジョブが実行されます。

GCPサービスを実行するトリガーの役割をCloud Schedulerが果たしてくれます。

公式ドキュメントはこちらです。

2.2 Cloud Functions

公式ドキュメントの紹介によると、Cloud Functionsは
イベント駆動型のサーバーレス コンピューティング プラットフォーム
となっています。

恐らく、もっとも多い紹介の仕方としては「GCPにおけるAWS Lambda」だと思います。

Cloud Functionsのトリガーとしては、
HTTPトリガー・Cloud Pub/Subトリガー・Cloud Storageトリガーなどがあります。

公式ドキュメントはこちらです。

2.3 Cloud Dataflow

公式ドキュメントの紹介によるとCloud Dataflow は、
さまざまなデータ処理パターンの実行に対応したマネージド サービス
となっています。

Apach Beamを利用した、大規模データを処理するためのプラットフォームとなっています。

バッチ処理の場合、処理実行中にのみインスタンスが立ち上がり、処理が終わったらインスタンスが削除されるので、インスタンスの管理をする必要がない便利なサービスとなっています。

公式ドキュメントはこちらです。

2.3.1 Cloud Dataflow Template

Cloud Dataflow Templateを利用すると、
Cloud Dataflowのコードが予めCloud Storageにステージングされるので、
パイプラインの実行時に毎回コンパイルする必要がなくなります。

今回はGoogleから提供されているWordCountのテンプレートを使用します。

公式ドキュメントはこちらです。

2.4 備考

2.1~2.3までで名前は出ていたのですが、紹介していないGCPのサービスをここで紹介します。

本稿では、これらのサービスを利用しないので軽い紹介に留めています。

  • App Engine :フルマネージド型のサーバーレスなアプリケーション プラットフォームです。
  • Cloud Pub/Sub:GCPのメッセージキューサービスです。
  • Cloud Storage:GCPのストレージサービスになります。

3. 作成するアーキテクチャの概要

今回は図のようにアーキテクチャを組もうと思います。

処理の内容としては、

  • 1. 時間起動でCloud SchedulerからCloud Functionsへジョブを実行。
  • 2. Cloud FunctionsからCloud Dataflow Templateを実行します。
  • 3. Cloud DataflowではWordCountというサンプルを使用します。

※WordCountではCloud Storageにあるファイルを読み取り、ファイル内に出てきた単語をカウントし、
 その結果をCloud Storageに出力します。

4. Cloud Functionsで実行するコードを用意しよう

公式ドキュメントにある、
Cloud Dataflowテンプレートを実行するPythonコードのサンプルを参考にして
以下のようなコードを作成しました。

こちらがコードになります。
※PROJECTには、ご自分のGCPプロジェクトIDをご入力ください。

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

def execute_dataflow_template(request):
    credentials = GoogleCredentials.get_application_default()
    service = build('dataflow', 'v1b3', credentials=credentials)

    PROJECT = 'example'
    GCSPATH = 'gs://dataflow-templates/latest/Word_Count'
    BODY = {
        'jobName': 'word-count-test',
        'parameters': {
            'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
            'output': 'gs://serverless-batch-test/output/my_output'
        },
        'environment': {
            'tempLocation': 'gs://serverless-batch-test/temp',
            'zone': 'us-central1-f'
        }
    }

    request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
    response = request.execute()
    print(response)

これをCloud Functionsに登録します。

まずは「関数を作成」を押してください。

続いて、ランタイムからPython3.7を選択してください。

main.pyに、用意したCloud Dataflowテンプレートを実行するPythonコードを入力します。

requirements.txtに、以下画像のライブラリを追加します。

こちらが、requirements.txtに記入したライブラリになります。

google-api-python-client==1.7.8
oauth2client==4.1.3

実行する関数を「execute_dataflow_template」に変更します。

最後に「作成」を押します

5. Cloud Schedulerを設定しよう

1. 「ジョブを作成」を押します
2. 「名前」に「test-scheduler」を入力
3. 「頻度」に「*/10 * * * *」を入力
4. 「ターゲット」に「HTTP」を選択
5. 「URL」に4で設定したCloud FunctionsのURLを設定
 →Cloud Functionsの「トリガー」にURLがあるので、そのURLを設定します。
6. 「HTTPメソッド」を「GET」に設定します。

設定が完了したら「作成」を押してください。

6. 実際に動かしてみよう

Cloud Schedulerが動くのを待ってみましょう。

Stackdriver loggingから、ログを確認してみましょう。

Cloud Schedulerのジョブが起動したら、Cloud Functionsを見てみましょう。
Cloud Schedulerから呼び出されたのがわかります。

続いて、Cloud Dataflow Consoleを見てみましょう。
ジョブが正常に完了しているのがわかります。

最後に、Cloud Storageを確認してみましょう。
Cloud Dataflowで処理した結果のファイルが作成されています。

すべての処理が完了していることが確認できたと思います。

7. 終わりに

今回はGoogle提供のCloud Dataflow Templateを使用したので、
実際に実装したのはCloud Functionsのコードのみです。

その他は画面から設定しただけで、サーバレスなバッチ処理のアーキテクチャを組むことが出来ました。

AWSと比べると利用者の少ないGCPですが、興味を持った方は無料トライアルで触ってみてください!

最後までお読みいただき、ありがとうございました。

平井 拓海
CSVIT事業部 IT(インテグレーションテック) 平井 拓海
2017年新卒。GCPを使用した開発、運用、保守を行っています。