こんにちは。

キャスレーコンサルティングの 鈴木 和翔 です。



今回は、現場業務でビッグデータ負荷検証を

「KinesisDataStream & Lambda」 にて検証した際に洗い出された

課題点とそれに対して行った対策案等を紹介して行こうと思います。

 

負荷条件

今検証での主な負荷条件は以下となります。



・1レコード(1リクエスト容量) 150KB 程

・求めるスループット 4000TPS 達成




つまり、どういう流量になるかというと、

 
1秒間 150KB * 4000(TPS) = 約586MB

1分間 586MB * 60(秒) = 約34GB

1時間 34GB * 60(分) = 約2040GB
 

1時間に 約2040GBものデータが流れる事となります!

(間違えて自分のアカウントで検証すると一気に破産しますね。。)

アーキテクチャ

今検証でのアーキテクチャーの紹介です。



「KinesisDataStream & Lambda」

でのビッグデータ検証アーキテクチャーを簡単に図にしてみました。







簡単に検証のアーキテクチャーの流れを説明すると、



という検証用の簡単な構成となります。

ちなみに今回使用した 「KinesisDataStream」 への

データ投入を行う 「Lambda」アプリと

「S3」への保存を行う 「Lambda」アプリ の言語には 「Java」 を使用し、

「Kinesis」 へのデータ投入に使用したライブラリは 「java-aws-sdk」 を使用しました。

試験用のアプリ紹介



今回の検証にて作成した簡単なアプリがありますので、紹介していきます。


①EC2からLambda関数を非同期に呼び出すアプリ



LambdaInvoker.java

 
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.AWSLambdaAsync;
import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder;
import com.amazonaws.services.lambda.model.InvokeRequest;

class LambdaInvoker {
  //プロセスID
  private static AtomicInteger atomicProcessId = new AtomicInteger(0);
  //TPS値
  private static int tps = 100;
  //起動したいLambda名
  static final private String lambdaFunctionName = "kinesis-test";
  //リージョン指定
  static final String regionName = Regions.AP_NORTHEAST_1.toString();
  //エグゼキューター
  private static ExecutorService exec;
  //プロセス起動時間(テスト時間/秒)
  private static int processTimes = 60;
  private static AWSLambdaAsync lambda;
  //Lambda送信用文字列
  private static String function_input = "{\"who\":\"AWS SDK for Java\"}";

  public static void main(String[] args) {
    //スレッドプール作成
    exec = Executors.newFixedThreadPool(128);
    //実処理実行
    process();
  }

  private static void process() {
    try {
      // スレッドの停止タイマーを先に実行しておく
      shutdownTimer();
      //Lambda送信オブジェクトの生成
      lambda = AWSLambdaAsyncClientBuilder.standard()
          .withRegion(Regions.AP_NORTHEAST_1).build();
      // Lambdaを起動する
      sendTimer(lambdaFunctionName, regionName);
    } catch (Exception e) {
      System.out.println("massage send failure: " + e);
    }
  }

  private static void sendTimer(String lambdaFunctionName, String regionName) {

    // デーモンスレッドで実行
    Timer timer = new Timer(false);

    timer.scheduleAtFixedRate(new TimerTask() {
      int timerCount = 1;

      @Override
      public void run() {
        showStatus(timerCount);
        for (int i = 0; i < tps; i++) {
          exec.execute(() -> invokeFunction(lambdaFunctionName, regionName));
        }
        if (timerCount >= processTimes) {
          timer.cancel();
          System.out.println(timerCount + "/" + processTimes + "execute complete.");
        }
        timerCount++;
      }
    }, 0, 1000); //task, delay, period
  }

  /**
   * Lambda送信用メソッド.
   * @param lambdaFunctionName
   * @param regionName
   */
  private static void invokeFunction(String lambdaFunctionName, String regionName) {
    int processId = atomicProcessId.incrementAndGet();
    System.out.println("processId: " + processId);
    //非同期リクエスト作成
    InvokeRequest req = new InvokeRequest()
        .withFunctionName(lambdaFunctionName)
        .withPayload(ByteBuffer.wrap(function_input.getBytes()))
        .withInvocationType("Event");
    lambda.invoke(req);
  }

  /**
   * @param timerCount カウント
   */
  private static void showStatus(int timerCount) {
    if (timerCount % 10 == 0 || processTimes <= 10) {
      System.out.println("processTimes: " + timerCount + "/" + processTimes);

      Runtime runtime = Runtime.getRuntime();
      Map<String, Object> statusMap = new LinkedHashMap<>();
      statusMap.put("totalMemory", runtime.totalMemory() / 1024 + "KB");
      statusMap.put("maxMemory", runtime.maxMemory() / 1024 + "KB");
      statusMap.put("freeMemory", runtime.freeMemory() / 1024 + "KB");
      statusMap.put("activeThreads", Thread.activeCount());

      System.out.println(statusMap.toString());
    }
  }

  /**
   * シャットダウンタイマー
   */
  private static void shutdownTimer() {
    Timer shutdown = new Timer(false);
    shutdown.schedule(new TimerTask() {

      @Override
      public void run() {
        exec.shutdownNow();
        shutdown.cancel();
        System.out.println("ExecutorService shutdown.");
      }
    }, processTimes * 1000); //task, delay(ms)
  }
}
 



pom.xml

 
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>lambda-invoke</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>lambda-invoke</name>
    <url>http://maven.apache.org</url>

    <!-- fatJar生成用 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>test.lambda_invoker.LambdaInvoker</mainClass>
                        </manifest>
                    </archive>

                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-lambda -->
        <!-- Lambda呼び出し用ライブラリ -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-lambda</artifactId>
            <version>1.11.642</version>
        </dependency>
    </dependencies>
</project>

 

 

②KinesisDataStreamにデータを投入するLambdaアプリ



KinesisProducer.java

 
import java.nio.ByteBuffer;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

public class KinesisProducer implements RequestHandler<Object, String> {
  //リージョン名指定
  String regionName;
  //Kinesisストリーム名指定
  String streamName;
  //150KBの電文を作成
  String testJson;

  String partitionKey;

  public KinesisProducer() {
    this.regionName = new String("ap-northeast-1");
    this.streamName = new String("test-kinesis-stream");
    this.partitionKey = new String("suzuki-0000001");
    this.testJson = "{\"test\":\"a";
    //150KB分のJSON文字列を作成
    for (int i = 10; i < 153611; i++) {
      this.testJson += "a";
    }
    testJson += "\"}";
  }

  @Override
  public String handleRequest(Object input, Context context) {

    AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();

    clientBuilder.setRegion(regionName);
    AmazonKinesis kinesisClient = clientBuilder.build();

    PutRecordRequest putRecordRequest = new PutRecordRequest();
    putRecordRequest.setStreamName(streamName);
    putRecordRequest.setData(ByteBuffer.wrap(testJson.getBytes()));
    putRecordRequest.setPartitionKey(partitionKey);
    kinesisClient.putRecord(putRecordRequest);

    return "test";
  }
}
 



pom.xml

 
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>kinesis-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kinesis-producer</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <!-- === Begin maven-shade-plugin === -->
    <!-- shade-plugin はLambda用Jarを生成する場合に必須です。 === -->
    <!-- mvn goal指定でも可能ですが、自動生成されるようにしました。=== -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                    <shadedArtifactAttached>true</shadedArtifactAttached>
                    <shadedClassifierName>aws</shadedClassifierName>
                    <transformers>
                        <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>test.kinesis_producer.KinesisProducer</mainClass>
                        </transformer>
                    </transformers>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-core -->
        <!-- AWSサービスとのやり取りに必要なライブラリ -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-core</artifactId>
            <version>1.11.642</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-lambda -->
        <!-- Lambdaサービスとの通信用ライブラリ -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-lambda</artifactId>
            <version>1.11.642</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-lambda-java-core -->
        <!-- Lambdaハンドラ作成に必要なライブラリ -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-lambda-java-core</artifactId>
            <version>1.2.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-kinesis -->
        <!-- Kinesisサービス利用時に必要なライブラリ -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-kinesis</artifactId>
            <version>1.11.642</version>
        </dependency>
    </dependencies>
</project>
 

③KinesisDataStreamからデータを受け取ってS3に格納するLambdaアプリ



KinesisConsumer.java

 
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

public class KinesisConsumer implements RequestHandler<KinesisEvent, String> {

  //バケット名
  String bucketName;
  //ファイルパス
  String filePath;
  //ファイル名
  String fileName;
  AmazonS3 s3;

  public KinesisConsumer() {
    this.bucketName = new String("test-bucket");
    this.filePath = new String("/test/");
    this.fileName = new String("test.bin");
    this.s3 = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
  }

  @Override
  public String handleRequest(KinesisEvent input, Context context) {

    for (KinesisEventRecord ker : input.getRecords()) {
      // Recordを取得
      Record record = ker.getKinesis();

      // レコードをバイナリで取得
      byte[] byteArray = new byte[record.getData().remaining()];
      record.getData().get(byteArray);
      String json = new String(byteArray);
      try {
        //テキストファイルをローカルに作成する
        File file = new File("./tmp/" + fileName);
        try (FileWriter filewriter = new FileWriter(file)) {
          filewriter.write(json);
          //S3にファイルをputする
          s3.putObject(bucketName, filePath + fileName, file);
        }
      } catch (IOException e) {
        System.out.println(e);
      }
    }
    return "test";
  }
}
 



pom.xml

 
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>kinesis-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kinesis-consumer</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <!-- === Begin maven-shade-plugin === -->
    <!-- shade-plugin はLambda用Jarを生成する場合に必須です。 === -->
    <!-- mvn goal指定でも可能ですが、自動生成されるようにしました。=== -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                    <shadedArtifactAttached>true</shadedArtifactAttached>
                    <shadedClassifierName>aws</shadedClassifierName>
                    <transformers>
                        <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>test.kinesis_consumer.KinesisConsumer</mainClass>
                        </transformer>
                    </transformers>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-core -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-core</artifactId>
            <version>1.11.642</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-lambda -->
        <!-- Lambdaサービスとの通信用ライブラリ -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-lambda</artifactId>
            <version>1.11.642</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-lambda-java-core -->
        <!-- Lambdaハンドラ作成に必要なライブラリ -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-lambda-java-core</artifactId>
            <version>1.2.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-lambda-java-events -->
        <!-- Lambdaで特殊なイベントを取得する際に必要(S3Event 等) -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-lambda-java-events</artifactId>
            <version>2.2.7</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-kinesis -->
        <!-- Kinesisサービス利用時に必要なライブラリ -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-kinesis</artifactId>
            <version>1.11.642</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3 -->
        <!-- S3サービス利用時に必要なライブラリ -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-s3</artifactId>
            <version>1.11.642</version>
        </dependency>

    </dependencies>
</project>
 





これらのサンプルアプリケーションを使用してビッグデータ検証を行いました。

AWSサービスにおける制限の前提



検証での課題の話をする前に抑えておきたいポイントが1点あります。

それは、AWSのサービスには制限が存在しますが、

緩和出来る制限緩和出来ない制限が存在することです。



「KinesisDataStream」 「Lambda」 の制限について軽くまとめてみました。

・KinesisDataStreamの制限

(参考開発者ガイド:

 https://docs.aws.amazon.com/ja_jp/streams/latest/dev/service-sizes-and-limits.html)



・1ストリーム 辺りのシャード数 制限無し (制限緩和申請によって最大シャード数の制限緩和が可能)

・1シャード辺りの 書き込み制限容量 1 秒あたり最大 1 MiB (制限緩和 不可)

・1シャード辺りの 書き込み制限レコード数 1 秒あたり最大 1000レコード (制限緩和 不可)

・1シャード辺りの 読み込み制限容量 1 秒あたり最大 2 MiB (制限緩和 不可)

・1シャード辺りの 書き込み制限レコード数 1 秒あたり最大 1000レコード (制限緩和 不可)

・各シャードは 1 秒あたり最大 5 件のトランザクションをサポート (制限緩和 不明)

・GetRecords への呼び出しで 10 MiB が返される場合、

 次の 5 秒以内に行われたそれ以降の呼び出しでは、例外がスローされる。(制限緩和 不明)

・Lambdaの制限

(参考開発者ガイド:

 https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/limits.html)



・同時実行数 1000 (制限緩和申請によって同時実行数の制限緩和が可能)

・関数とレイヤーストレージ 75GB (制限緩和申請によってストレージリソースの量の制限緩和が可能)

・関数のメモリ割り当て 128 MB ~ 3,008 MB まで、64 MB ごとに増加可能。(制限緩和 不可)

・関数タイムアウト 最大900 秒 (15 分) (制限緩和 不可)

・関数の環境変数 4 KB まで (制限緩和 不可)

・関数リソースベースのポリシー 20 KB まで (制限緩和 不可)

・関数レイヤー 5 つのレイヤー (制限緩和 不可)

・関数の同時実行数のバースト 500~3000 (リージョンによって異なる) (制限緩和 不可)

・呼び出しの頻度 (1 秒あたりのリクエスト)

   10 倍の同時実行数の制限 (同期的 ? すべてのリソース) (制限緩和 不可)

   10 倍の同時実行数の制限 ( 非同期的 ? AWS 以外のソース) (制限緩和 不可)

   無制限 (非同期的 ? AWS サービスのソース) (制限緩和 不可)

・呼び出しペイロード(リクエストとレスポンス)

   6 MB (同期)(制限緩和 不可)

   256 KB (非同期)(制限緩和 不可)

・デプロイパッケージサイズ

   50 MB (zip 圧縮済み、直接アップロード)(制限緩和 不可)

   250 MB (解凍、例: レイヤー)(制限緩和 不可)

   3 MB (コンソールエディタ)(制限緩和 不可)

・テストイベント (コンソールエディタ)

   10 (制限緩和 不可)

・/tmpディレクトリのストレージ

   512 MB (制限緩和 不可)

・ファイルディスクリプタ

   1,024 (制限緩和 不可)

・実行プロセス/スレッド

   1,024 (制限緩和 不可)



以上が「KinesisDataStream」 「Lambda」によるサービスの制限となります。

発生した課題の解決



ビッグデータ検証を行うにあたってAWSサービスによる制限による課題点が多かったため、

前段にてサービス制限について触れました。



今回紹介したいのは検証によって発生した以下の2点の課題についてのお話しと、

それに対してどう突破したかについてご紹介したいと思います。

①「KinesisDataStream」の書き込み上限数による課題



先ほど紹介した 「KinesisDataStream」 の制限にて以下の項目があったかと思います。



・1シャード辺りの 書き込み制限容量 1 秒あたり最大 1 MiB (制限緩和 不可)

・1シャード辺りの 書き込み制限レコード数 1 秒あたり最大 1000レコード (制限緩和 不可)



たとえば 秒間 500MiBのデータを 「KinesisDataStream」 に格納したいという要件の場合、

1シャード辺りの書き込み制限容量 は1秒あたりに最大1MiBの為

単純計算で500シャードがあれば要件を満たせることとなります。



ですが、ただ500Mibのデータを格納する為に500シャードに納めるというのは、

ある程度工夫をしなければなりません。



なぜなら、「KinesisDataStream」に対してデータ投入をする際に

必ず等分散でデータ投入されるわけではないからです。



どういう事かというと、



「KinesisDataStream」のシャード振り分けの為のハッシュ値が「partitionkey」という項目から

MD5ハッシュ関数を用いて計算されますが、

この振り分け方法では完全な等分散とならないのです。







レコード数の偏りによって書き込み上限を超えてしまったシャードから

書き込み上限エラーが発生してしまいます。



つまり MD5ハッシュ関数を使用した分散の方法では必ず偏りが発生してしまい、

それに伴って書き込み上限エラーが発生する可能性が存在するという事です。



これに対しての解決策としては2つほどあります。



★解決策①

純粋にシャード数を増やし、ある程度シャード単位でのバラつきが発生しても

書き込み上限エラーとならない方法もしくは書き込み上限エラーを許容するという方法。




その通りの方法となりますが、デメリットとしてかなりの料金が発生してしまうのと

完全にエラーを無くすことはできないことです。



ちなみに今回の検証の場合は 等分散されていれば 590シャード程あれば足りるという計算でしたが、

 
4000TPS * 150KB = 585.9375MB = 590)
 



完全に書き込み上限エラーが発生しなくなるまでシャード数を増やしたところ、

1700シャードまで増やしても書き込み上限エラーが1件発生してしまいました。



コスト3倍以上でも、まだ書き込み上限エラー発生のリスクがある為に、この解決策はボツとします。



★解決策②

「KinesisDataStream」にデータ投入される際に「MD5ハッシュ関数」を使用せず、

自らハッシュ値を指定して指定シャードへと格納させる方法。




えっそんなこと出来るの?って感じですが、「KinesisDataStream」にレコードをputする際に

ハッシュ値をそのまま指定することが出来ます。



そのハッシュ値を基に「KinesisDataStream」側がシャード単位で振り分けているのですが、

法則さえ分かれば可能です。







じゃあ具体的にシャード指定ってどうすればいいの?というところの説明を致します。



MD5ハッシュ値での最大値は16進数で 

 
FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF 
 

となっています。



これを10進数に変換すると

 
340282366920938463463374607431768211455
 

という値となります。



この整数値を基にKinesisDataStreamはシャードを振り分けているのですが、

例えばKinesis内の シャード数が 2シャードだった場合は、

 
シャード1: 
 0 ~ 170141183460469231731687303715884105727
シャード2: 
 170141183460469231731687303715884105728 ~ 340282366920938463463374607431768211455
 



のハッシュ値が割当たっており、この値に該当するハッシュ値を直接指定することで、シャード指定をすることが出来ます。



ちなみにシャード毎のハッシュ値の求め方ですが、2シャードの場合は

 
シャード1: 
 0 ~ 340282366920938463463374607431768211455/2
シャード2: 
 340282366920938463463374607431768211455/2+1~340282366920938463463374607431768211455
 





となります。



KinesisDataStreamを4シャードとした場合は、以下のハッシュ値となります。

 
シャード1: 
 0 ~ 85070591730234615865843651857942052863
シャード2: 
 85070591730234615865843651857942052864 ~ 170141183460469231731687303715884105727
シャード3: 
 170141183460469231731687303715884105728 ~ 255211775190703847597530955573826158591
シャード4: 
 255211775190703847597530955573826158592 ~ 340282366920938463463374607431768211455
 





このハッシュ値の仕組みさえ分かればレコード毎に格納するシャードを直接指定する事が出来ます。

その為、この振り分け方を上手く利用して等分散すればコストも最小限に抑える事が出来る上に、

書き込み上限エラーを完全に無くすことが出来ると言えるでしょう。



★実際にやってみた。

実際に4シャードのKinesisDataStreamを作成し、

設定したハッシュ値通りにレコードが格納されるか検証してみました。

KinesisDataStream を作成します。

ストリーム名 test-kinesis-stream

シャード数 4






検証プロデューサーアプリは以下となります。



KinesisProducer.java

 
import java.nio.ByteBuffer;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

public class KinesisProducer implements RequestHandler<Object, String> {
  //リージョン名指定
  String regionName;
  //Kinesisストリーム名指定
  String streamName;
  //150KBの電文を作成
  String testJson;
  //partitionKey指定
  String partitionKey;
  //ハッシュ値指定
  String explicitHashKey;

  public KinesisProducer() {
    this.regionName = new String("ap-northeast-1");
    this.streamName = new String("test-kinesis-stream");
    this.partitionKey = new String("suzuki-0000001");
    this.testJson = "{\"test\":\"a";
    //150KB分のJSON文字列を作成
    for (int i = 10; i < 153611; i++) {
      this.testJson += "a";
    }
    testJson += "\"}";
  }

  @Override
  public String handleRequest(Object input, Context context) {

    AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();

    clientBuilder.setRegion(regionName);
    AmazonKinesis kinesisClient = clientBuilder.build();

    //それぞれのシャードに格納されるようにレコードを生成
    PutRecordRequest shard1KinesisRecord = recordCreator("0");
    PutRecordRequest shard2KinesisRecord = recordCreator("85070591730234615865843651857942052864");
    PutRecordRequest shard3KinesisRecord = recordCreator("170141183460469231731687303715884105728");
    PutRecordRequest shard4KinesisRecord = recordCreator("255211775190703847597530955573826158592");

    //Kinesisに各レコードをputする
    PutRecordResult putRecordResult1 = kinesisClient.putRecord(shard1KinesisRecord);
    PutRecordResult putRecordResult2 = kinesisClient.putRecord(shard2KinesisRecord);
    PutRecordResult putRecordResult3 = kinesisClient.putRecord(shard3KinesisRecord);
    PutRecordResult putRecordResult4 = kinesisClient.putRecord(shard4KinesisRecord);

    //どのシャードに格納されたかリザルト情報から取得する。
    System.out.println("格納シャード:" + putRecordResult1.getShardId());
    System.out.println("格納シャード:" + putRecordResult2.getShardId());
    System.out.println("格納シャード:" + putRecordResult3.getShardId());
    System.out.println("格納シャード:" + putRecordResult4.getShardId());

    return "test";
  }

  /**
   * 引数で指定されたhashKeyの値を基にKinesisRecordを作成する.
   * @param hashKey
   * @return PutRecordRequest
   */
  public PutRecordRequest recordCreator(String hashKey) {
    PutRecordRequest putRecordRequest = new PutRecordRequest();
    putRecordRequest.setStreamName(streamName);
    putRecordRequest.setData(ByteBuffer.wrap(testJson.getBytes()));
    putRecordRequest.setPartitionKey(partitionKey);
    //ハッシュキーの設定
    putRecordRequest.setExplicitHashKey(hashKey);
    return putRecordRequest;
  }
}
 



今回は4つのレコードを作成し、それぞれにExplicitHashKeyを設定して

各シャードにそれぞれ1つずつレコードが格納されるかを確認します。



確認にはKinesisDataStreamのリザルト情報からShardIdを取得し、

デバッグログにて確認致しました。



CloudWatchLogs





それぞれのシャードにバラバラに格納されていることが確認出来ました!



試しにshardId-000000000003のみにレコードを集約してみたいと思います。

先ほどの KinesisProducerクラス内の

setExplicitHashKey()メソッド引数部分のみを変更して検証してみました。

 
//shardId-000000000003に格納されるような値を挿入したレコードを生成
PutRecordRequest shard1KinesisRecord = recordCreator("255211775190703847597530955573826158592");
PutRecordRequest shard2KinesisRecord = recordCreator("275211775190703847597530955573826158592");
PutRecordRequest shard3KinesisRecord = recordCreator("305211775190703847597530955573826158592");
PutRecordRequest shard4KinesisRecord = recordCreator("340282366920938463463374607431768211455");
 



もう一度デバッグログを確認すると





shardId-000000000003 のシャードにだけレコードを格納することが確認できました!



★結論



①KinesisDataStreamの書き込み上限数による課題は、

ハッシュ値を直接指定することで解決しましょう!!


②Lambda初回起動による処理遅延が発生してしまう課題



※この課題に関しては2019年9月頃より、AWS様が解決へと動いてくれています。



どういう課題が発生していたかというと、

LambdaからVPCエンドポイントに接続しなければならない要件がある場合、

LambdaをVPC内に配置する事が必須だが、LambdaをVPC内に配置すると

コンテナ作成時にENI(ElasticNetworkInterface)の作成処理とアタッチ処理が走ってしまい、

 
ENIの作成処理 → アタッチ処理 → Lambda処理開始 
 

という動きとなってしまう為、

ENIの作成とアタッチ時間分遅延が発生してしまうという現象が発生していました。

(約5~60秒程かかっていました。)



ちなみにその現象は 「コールドスタート」 と名付けられており、

対処方法としてはping処理という方法ぐらいしか解決策がなかったのですが、

AWS様が解決したことによりこの課題による遅延は無くなる見込みです。(私はまだ未検証です)

(参考開発者ガイド:

 https://aws.amazon.com/jp/blogs/news/announcing-improved-vpc-networking-for-aws-lambda-functions)



※このブログが記載される2019年10月時点ではまだ全てのリージョンに対して改修が行われていないため、

念のため対処法について残しておきます。



★解決策



VPC内にLambdaを設置した際に発生する「コールドスタート」への対策方法ですが、

先ほど解説したping処理によって回避する方法があります。



そもそも何故コールドスタートが発生するのかというと、







という流れの中でLambdaにENIがアタッチされていないからという事になります。



では、既にLambdaにENIがアタッチされていればコールドスタートは発生しない

という事になりますので、常にリクエストをLambdaに投げ続けていれば、

ENIが削除されることが無く常にENIがアタッチされ続けるという事になるわけです。



常にアタッチされ続けている状況になれば、

ENIの作成処理が行われる事が極端に少なくなる為

コールドスタートの発生確率をグンと少なくなるというものです。



常にと言ってもLambdaで行っている処理自体を邪魔するわけにはいきませんので、

「Lambda」コンテナが削除されない範囲内(5分くらい)で

定期的に「Lambda」を最大同時実行数分キックしてやればいい
というわけです。



★Lambdaキック用アプリ

Lambdaキック用のLambdaサンプルアプリケーションです。

このアプリは、

対象Lambdaの同時実行数分発火させることができるLambdaアプリとなります。



変数lambda_name には発火させたいLambda名を入力

変数concurrent_executions には上げたい同時実行数を入力

以上の設定を行うことで、対象「Lambda」に非同期でリクエストを送信することが出来ます。



※warm_up処理をnode.jsで実装したのには理由があります。

 JavaやPythonで実装してもよかったのですが、「Lambda」はJVMの起動が極端に遅い為

 素早く処理を行うことが出来るnode.jsにて実装しました。



warmup_handler.js

 
//aws-sdk変数取得
const aws = require("aws-sdk");
//リージョン設定
aws.config.region = "ap-northeast-1";
//Lambda変数取得
const lambda = new aws.Lambda();

//warmup対象のLambda名
var warmup_lambda_name = 'warmup_lambda_name';
//warmupLambda同時実行数の設定
var concurrent_executions = 20;

//functionsの設定変数
const functions = [{"name":{warmup_lambda_name},"config":{"enabled":true,"payload":"{\"source\":\"serverless-plugin-warmup\"}","concurrency":{concurrent_executions}}}];

//Lambda関数宣言
module.exports.warmUp = async (event, context) => {
  const invokes = await Promise.all(functions.map(async (func) => {
    //Lambda起動パラメーターの設定
    const params = {
      ClientContext: Buffer.from(`{"custom":${func.config.payload}}`).toString('base64'),
      FunctionName: func.name,
      InvocationType: "Event",
      LogType: "None",
      Qualifier: process.env.SERVERLESS_ALIAS || "$LATEST",
      Payload: func.config.payload
    };
    try {
      //設定した同時実行数分非同期でLambda呼び出し
      await Promise.all(Array(func.config.concurrency).fill(0)
      .map(async _ => await lambda.invoke(params).promise()))
      return true;
    } catch (e) {
      return false;
    }
  }));
}
 



プログラムを実行させると、対象Lambdaへの非同期リクエストが送信され、

リクエストによって対象Lambdaが起動される仕組みとなっております。



今回の要件だと5分置きにLambdaの同時実行数を最大まで上げるという事なので、

warmup_handler.js を実装したLambdaに対して

「CloudWatchEvents」を5分置きに仕掛けることによって、5分間隔でLambdaを起動出来ます。









以上の「CloudWatchEvents」では様々な起動設定が出来ますので、

有効活用してみては如何でしょうか!





ちなみに「Lambda起動」によって作成されるENIの本数は概算することが出来ます。

(参考開発者ガイド:

 https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/vpc.html)



必要ENI本数の概算の式

 
Projected peak concurrent executions * (Memory in GB / 3GB)
 



Projected peak concurrent executions とは

Lambdaの最大同時実行数(リクエストによって実際に上がった同時実行数)。



Memory in GB とはLambdaで設定しているメモリ量。



例えば 200msecで処理できる producerLambdaアプリが

4000TPSという性能を求められているときの同時実行数は単純計算で

 
4000(リクエスト/秒) / 5 (1000msec / 200msec)  = 800 
 

つまり800同時実行数となります。



producerLambdaアプリの設定メモリ数が 1GB(1024MB) だった場合は

 
800(Projected peak concurrent executions) * (1(Memory in GB) / 3) = 266.6 
 



約267本のENIが必要という事になります!



ビッグデータ検証を行う場合は、

・リージョンあたりのネットワークインターフェイス 制限(デフォルト値) 350

ですので今回の検証では、ENI本数制限を超えることは無かったですが、

LambdaをVPC内で動作させるといった際には、この辺の制限等も気にしてみてください。



以上、最後までお読み頂き、ありがとうございました。

鈴木 和翔
CSVIT事業部 鈴木 和翔
自動運転フレームワーク開発業務に携わっています。AWSでビッグデータ検証を行う際に事前に明確な課題が少なかったためまとめてみました。