皆さん、こんにちは。

キャスレーコンサルティング ID(インテグレーション&デザイン)部の鈴木(和)です。

現在、自動運転の研究開発プロジェクトに参画しており、
そこで学んだことをもとに
今回はビッグデータ関連の主なツールを使用してブログにしました。
ビッグデータに興味のある方は是非、御覧下さい。

自分がインタビューを受けたブログもありますので、
興味がある方は、コチラもぜひ御覧になってください。

内容

以下の二つの機能を作るまでの内容となっています。

・Fluentdを使用して疑似センサーデータを生成し、
  Kafkaメッセージブローカーを使用して、センサーデータを毎秒受け取る機能

・ Kafkaメッセージブローカー経由にて、SparkStreamingからセンサーデータを毎秒受け取り、
  分散DBであるCassandraに格納する機能

目次

①環境構築
②Fluentdによるセンサーデータの作成
③Kafkaによるデータ取得
④SparkStreamingを使用してKafkaからメッセージを取得
⑤Cassandraのインストールと起動
⑥SparkStreamingを使用してDB格納操作を実装
※全体イメージ図
主な使用ツールとバージョン

  • Vagrant centos-7.2
  • td-agent 1.2.2
  • Kafka_2.11-1.0.0
  • Spark 2.3.1
  • Cassandra 3.11.3
  • Teraterm

    ①環境構築

    1,VirtualBoxのインストール

    http://www.oracle.com/technetwork/jp/server-storage/virtualbox/downloads/index.html

    2,Vagrantのインストール

    https://www.vagrantup.com/

    3,VagrantFileの記述
    VAGRANTFILE_API_VERSION = "2"
    
     Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
       config.vm.box = "centos7.2"
       config.vm.box_url = "https://github.com/CommanderK5/packer-centos-template/releases/download/0.7.2/vagrant-centos-7.2.box"
       config.vm.network "private_network", ip: "192.168.33.15"
       config.vm.network "forwarded_port", guest: 8888, host: 8888
    
       config.vm.provider "virtualbox" do |vm|
         vm.memory = 4096
       end
     end
      
    
    4,上記の用にVagrantFileを記述したらVagrantを起動する
    $ vagrant up
     Bringing machine 'default' up with 'virtualbox' provider...
     '==> default: Clearing any previously set forwarded ports...
     '==> default: Clearing any previously set network interfaces...
     '==> default: Preparing network interfaces based on configuration...
     default: Adapter 1: nat
     default: Adapter 2: hostonly
     '==> default: Forwarding ports...
     default: 8888 (guest) => 8888 (host) (adapter 1)
     default: 22 (guest) => 2222 (host) (adapter 1)
     '==> default: Running 'pre-boot' VM customizations...
     '==> default: Booting VM...
     '==> default: Waiting for machine to boot. This may take a few minutes...
     default: SSH address: 127.0.0.1:2222
     default: SSH username: vagrant
     default: SSH auth method: private key
     default: Warning: Connection reset. Retrying...
     default: Warning: Connection aborted. Retrying...
     default: Warning: Remote connection disconnect. Retrying...
     default: Warning: Connection aborted. Retrying...
     default: Warning: Remote connection disconnect. Retrying...
     default: Warning: Connection aborted. Retrying...
     default: Warning: Connection reset. Retrying...
     default: Warning: Connection aborted. Retrying...
     default: Warning: Connection reset. Retrying...
     default: Warning: Connection aborted. Retrying...
     default: Warning: Connection reset. Retrying...
     default: Warning: Connection aborted. Retrying...
     default: Warning: Connection reset. Retrying...
     '==> default: Machine booted and ready!
     '==> default: Checking for guest additions in VM...
     default: The guest additions on this VM do not match the installed version of
     default: VirtualBox! In most cases this is fine, but in rare cases it can
     default: prevent things such as shared folders from working properly. If you see
     default: shared folder errors, please make sure the guest additions within the
     default: virtual machine match the version of VirtualBox you have installed on
     default: your host and reload your VM.
     default:
     default: Guest Additions Version: 4.3.30
     default: VirtualBox Version: 5.2
     '==> default: Configuring and enabling network interfaces...
     default: SSH address: 127.0.0.1:2222
     default: SSH username: vagrant
     default: SSH auth method: private key
     '==> default: Mounting shared folders...
     default: /vagrant => C:/Users/PCUser
     '==> default: Machine already provisioned. Run `vagrant provision` or use the `--provision`
     '==> default: flag to force provisioning. Provisioners marked to run always will still run.
      
    
    5,Vagrantの状態を確認
    $ vagrant status
     Current machine states:
    
     default                   running (virtualbox)
    
     The VM is running. To stop this VM, you can run `vagrant halt` to
     shut it down forcefully, or you can run `vagrant suspend` to simply
     suspend the virtual machine. In either case, to restart it again,
     simply run `vagrant up`.
      
    

    Virtualboxの状態が「runnnig」となっていれば、Vagrantが無事起動となります。

    6,Vagrantが無事起動したらTeraterm(ターミナルソフト)から接続してログイン

    192.168.33.15 に対して接続 ユーザー名 : vagrant パスワード : vagrant

    7,必要コンポーネントのインストール
    $ sudo yum install -y git git-core java-1.8.0-openjdk epel-release python -pip gcc python-devel libevent-devel
    8,Sparkのインストール ※本ブログではSpark2.3.1を使用

    以下サイトより、ダウンロードしたいバージョンを選択してください。 https://spark.apache.org/downloads.html

      $ sudo su -
     $ wget https://www-us.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz

    以下のコマンドでは、取得したファイルを解凍し、Sparkのディレクトリを作成している。

      $ tar xvzf spark-2.3.1-bin-hadoop2.7.tgz
     $ mv spark-2.3.1-bin-hadoop2.7 /usr/local/
     $ ln -s /usr/local/spark-2.3.1-bin-hadoop2.7/ /usr/local/spark
     $ exit 
      
    
    9,次に環境変数の設定
      $ export SPARK_HOME=/usr/local/spark
      $ export PATH=${SPARK_HOME}/bin:${PATH}
      $ export PYTHONPATH=${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
      
    
    10,試しにscalaのインタラクティブシャルを起動しましょう
    $ spark-shell
    11,次にpythonのインタラクティブシャルを起動しましょう
    $ pyspark

    ②Fluentdによるセンサーデータの作成

    本ブログでは、SUMOと呼ばれる交通シミュレーションソフトから吐き出される
    走行データを基に毎秒ごとのデータを疑似的に発生させ、
    分散処理を使用して、CassandraDBへと格納します。

    SUMO(Simulation of Urban MObility)とは、 道路網を疑似的に交通処理するように設計され、
    高度にポータブル顕微鏡と連続道路交通シミュレーションを行うオープンソースパッケージです。

    シミュレーションを行った車両の1秒ごとの情報が得られ、情報として主に

  • vehicleId (車両ID)
  • Speed (分/km)
  • 緯度
  • 経度
  • シミュレーション時間 (秒単位)
  • 等がxml形式で吐き出されます。(他情報も吐き出されていますが、ここでは割愛します)
    詳しいSUMOのシミュレーションについては、以下をご参照下さい。

    https://qiita.com/nattof/items/d198557126b7d3636f10

    SUMOから吐き出された走行データをJSON化し、 Fluentdによって情報収集し、
    Kafkaメッセージブローカーへと送信します。

    Fluentdとは

    米Treasure Data社が中心となり、「Log everything in JSON」を掲げ、
    すべてのログをJSONとして扱うことを目的として、オープンソースとして開発されている
    ログ収集管理ツールです。

    Rubyで実装されており、データを収集するのにインプットプラグイン、データを保持・移動するときに
    バッファプラグイン、データを出力するのにアウトプットプラグインを用います。

    互換性のあるプラグインを使用することで機能を拡張でき、その組み合わせにより、
    さまざまなログデータの活用方法を変えることが出来る非常に便利なシステムです。

    今回はFluentdを使って、簡単に車両走行センサーデータの取得を実現しましょう。

    1,td-agentのインストール
    $ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh
    2,td-agentの起動
    $ systemctl start td-agent
    3.td-agentの起動確認
    $ systemctl status td-agent
    4.configの設定(configファイルを編集する)
    $ vi /etc/td-agent/td-agent.conf

    configファイルの変更前と変更後になります。主に、項目を追加しています。

    使用した各タグの役割について、まとめました。

    他にも様々なパラメーターがあり、プラグインによっても様々な設定をすることができます。

    こちらの公式ページに、Fluentdのパラメーターの一覧やどんな意味を持つものか等が
    こちらに記載されているので、Fluentdに興味があればご参照ください。

    https://docs.fluentd.org/v1.0/articles/plugin-common-parameters

    これでFluentdの設定は、完了となります。

    ③Kafkaによるデータ取得

    Apache Kafkaとは

    Apache Kafkaは、2011年にLinkedInから公開された
    オープンソースの分散メッセージングシステムです。

    Kafkaは分散させることによって、大容量のデータ(ログやイベント等)を
    高スループット且つ低レイテンシで、収集/配信することが出来ます。

    そして、「Apache ZooKeeper」を動作させることによって、
    複数のKafkaサーバを連携させて、クラスタ化します。
    クラスタ化により耐障害性/高可用性を実現します。
    メッセージはファイルに保存され、クラスタ内でレプリカが作成されるため
    データの損失を防ぐ事が出来ます。
    (ログなども複数のサーバーから、どこか一箇所にまとめておく設定なども可能)

    そしてなんといってもスケーラブル性にすぐれており、
    ダウンタイムを必要とせず拡張することができます。

    今回は、そんなApache Kafkaをシングルノードで動作させ、メッセージを収集/配信させます。

    Kafkaへデータを送信する側はProducerと呼ばれ、データを受信する側をConsumerと呼びます。
    Topicsとは、メッセージ連携するためのトンネルの様なイメージです。
    Brokerはクラスタを構成し、ProducerとConsumerの間でメッセージの受け渡しを行う
    キューとして動作します。

    「Producer」 「Consumer」 「KafkaBroker」 「ZooKeeper」 の考え方のイメージを図示してみました。  
    (messageを今回はSUMOデータとして図示しています)

    1,Kafkaのインストール
      $ wget -O /tmp/kafka_2.11-1.0.0.tgz http://ftp.riken.jp/net/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz		
     $ tar -xzf /tmp/kafka_2.11-1.0.0.tgz -C /opt		
     $ mv /opt/kafka_2.11-1.0.0 /opt/kafka
      
    
    2.ZooKeeperの起動
    $ nohup /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties &	

    Zookeeperについて(Wikipedia引用)

    Apache ZooKeeperは、Apacheソフトウェア財団のオープンソースプロジェクトで、
    大規模分散システムでよく利用される、設定情報の集中管理や名前付けなどのサービスを
    提供するソフトウェアです。

    最初は、Hadoopのサブプロジェクトの一つでしたが、
    現在はトップレベルプロジェクトの一つになっています。

    ZooKeeperのアーキテクチャでは、高可用性を冗長サービスにより提供している。

    つまり、クライアントはあるZooKeeperノードへの問い合わせが失敗したら、
    他のノードに問い合わせることができます。

    データの更新は一つのマスターノードだけが行うようになっているので、
    データがノード間で矛盾した内容になることはありません。
    (ただし、最新のデータでない可能性はある)。

    更新を担当するマスターノードが、何らかの理由で停止した場合には、
    各ノード間で選挙を行い、新たな更新ノードが選ばれます。

    ZooKeeperは、データを階層的な名前空間に保存しているが、
    これはファイルシステムや、トライ木のデータ構造によく似ています。
    クライアントは、このノードに読み書きを行うことによって、
    設定情報共有などのサービスを提供してくれます。

    ZooKeeperは、「en:Rackspace」や「Yahoo!」などの企業で、
    「Solr」のようなオープンソースのエンタープライズサーチシステムで、使用されています。

    それでは、ZooKeeperを起動したところで、次にkafkaの起動を行います。

    3.Kafkaの起動
    $ nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &
    4.Topicの作成
    $ /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sumo-data		
     Created topic "sumo-data".
      
    

    Created topic “sumo-data” と表示されれば、成功です。

    5,作成したTopicがどのように配置されているかの確認
    $ /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 sumo-data
    6. パーティション構成の確認
    $ /opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic sumo-data
    7. 試しにkafka to kafka にてメッセージ送信をしてみる
    $ /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sumo-data

    ※コマンド実行すると、プロンプトが入力待ち状態となります。
    この状態で何か文字を打ち込むと、Kafka Brokerへメッセージを送信することができます。
    例として、文字を入力してみましょう。

    $ /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sumo-data		
     >CasleyConsulting
      
    

    Producer(送信側)から、「CasleyConsulting」と文字を打ち込みました。
    8番目の項目で、受信してみましょう。

    Kafkaを使用するにあたり、考えなければいけないことが少なくとも2つあります。
    それは、メッセージの保証度と分散のさせ方です。

    メッセージの保証度に関して、☆acks 分散のさせ方については ☆Partitioner があり、
    それぞれ簡単にですが説明いたします。

     

    ☆acks

    Kafkaにメッセージを送信した場合には、
    Broker側がメッセージ受信を成功したとする条件を、設定することができます。
    それが、acksというパラメーターです。

    acksにどのような値を設定すると、どのような動作になり、
    メッセージ受信のどのタイミングで受信成功とするかを、簡単にまとめてみました。

    acks = 0 :
         Producer側がメッセージを送信したとするタイミングで送信の成功とします。      
         つまり、Producer側がメッセージをソケットバッファの書き込みしたとするタイミングで      
         送信完了とするため、本当に書き込めているか分かりません。      
         逆に言えば、送信完了のタイミングが早いため、低レイテンシと言えます。
    acks = 1 :
         Producer側のメッセージ送信後、
         KafkaBrokerに存在するリーダーパーティションへの送信完了時に送信完了とします。      
         レプリカパーティションへの同期を待たず、
         送信完了とするため仮に送信完了時のタイミングで      
         リーダーパーティションノードへの障害が発生した場合にメッセージロストへ繋がります。
    acks = all :
         Producer側のメッセージ送信後、      
         レプリカパーティションまでの送信完了時に送信完了とします。      
         メッセージの保証度は高くなりますが、
         一つ一つのメッセージの送信完了時間が伸びるため、      
         レイテンシがかなり遅くなります。

    ここからは個人の意見ですが、Kafkaを使用するということは、
    ほとんどビックデータを扱うということなので、
    メッセージ一つ一つの保証度はかなり低くても、量でカバーするイメージです。
    つまりビックデータというものに関して言えばこの設定をする場合は、
    ほとんど少ないのではないでしょうか。

     

    ☆Partitioner

    Partitionerを特に指定せずにメッセージをProduceすると、デフォルトのPartitionerが使用され、
    KafkaPartitionにランダムでメッセージが振り分けられます。

    ラウンドロビン以外に、ID毎や何かの種類毎にメッセージを振り分けることによって、
    Consumeした際に指定した値ごとのまとまったメッセージを取得することができます。

    注意点として、定期的なセンサーデータを例とした場合に、
    KafkaのいずれかのPartitionにメッセージが偏ってしまうと、
    分散している意味があまりなくなってしまいますので、
    あくまで均等に分散を考えながらPartitionerをカスタムする事が重要です。

     

    今回のユースケースでは分散は行わず、シングルノードにて処理を行うため、
    デフォルトPartitionerを使用します。

    8.メッセージを受信できているかの確認
    $ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sumo-data --from-beginning

    コマンドを打ち込み、先ほど打ち込んだ「CasleyConsulting」という文字列が受け取れれば成功です。

    ・Fluentd と Kafkaを連携させ、定期的なセンサデータの投入を行う

    今までの設定で、既にFluentdからKafkaへの連携を実現することができます。

    /var/log/sumo_data/sumo_data.log 内に1秒ごとに、JSON形式のデータを投入することによって
    自動でFluentdがメッセージを読み出し、Kafkaへと連携してくれます。

    試しに、 /var/log/sumo_data/sumo_data.log にJSONデータを記載してみましょう。

    例) {“vehicleid”:00001,”speed”:40.9,”longitude”:35.642259,”latitude”:139.713458,”vehicletime”:”2018/11/04 16:42:01″}

    記載したら、Kafkaにてメッセージが受信できていることを確認します。

    $ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sumo-data --from-beginning

    無事受信出来ていることを確認できれば、成功です。

    これでようやく、センサーデータ実現の環境構築が終わりました。
    次に、SparkStreamingを使ってデータ格納をしていきます。

    ④SparkStreamingを使用してKafkaからメッセージを取得

    ・SparkStreamingを使用してKafkaからセンサデータを取得し、
      コンソールに表示させるサンプルアプリを作成

    以下のサンプルアプリを、pythonにて実装してみましょう。

    # coding: UTF-8
    from pyspark.sql import SparkSession

    if __name__ == "__main__":
    # SparkSessionの作成 (kafkaConsoleTest)
    spark = SparkSession.builder.appName("kafkaConsoleTest").getOrCreate
    spark.sparkContext.setLogLevel("WARN")

    # sumo-data トピックからメッセージを取得する
    kafkaData = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.33.10:9092").option("subscribe", "sumo-data").load()

    # 文字列データとして取得する
    kafkaDataString = kafkaData.selectExpr("CAST(value AS STRING) as value")

    # 変換した文字列をコンソール出力させる.
    query = kafkaDataString.writeStream.outputMode("append").format("console").start()

    # 継続的に処理を実行する
    query.awaitTermination()

    ファイル名を KafkaConsoleTest.pyとし、/opt/sparkapplication/ 内に保存したとして起動させる。

    $ /usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /opt/sparkapplication/kafkaConsoleTest.py	

    起動中に、Terminalを多重起動させ、
    /var/log/sumo_data/sumo_data.log にJSON形式のサンプルデータを記載する。

    Fluentdが自動的に読み込み記載したサンプルデータを取得し、Kafkaへと流れる。
    送信されたサンプルデータが、先ほど起動したSparkStreamingテストアプリケーションによって
    取得できていれば成功です。

    それでは、SparkStreaming と Cassandraデータベースを連携させ、
    ストリーミングでデータを格納する、アプリケーションの実装をする。

    (青枠部分の実装)

    ⑤Cassandraのインストールと起動

    1.Cassandraをパッケージとしてインストール

    /etc/yum.repos.d/cassandra.repo ファイルを作成

     cassandra.repo		
      [cassandra]		
      name=Apache Cassandra		
      baseurl=https://www.apache.org/dist/cassandra/redhat/311x/		
      gpgcheck=1		
      gpgkey=https://www.apache.org/dist/cassandra/KEYS
      
    
    $ yum install -y cassandra 

    実行するとCassandraがインストールされる。

    $ systemctl start cassandra
    2,cassandraが起動しているか確認
    $ systemctl start cassandra
    3.pytzのインストール

    Cassandraには、CQLを対話的に実行するcqlshというツールがあります。
    (postgreSQLでいうところのpsql)
    これを使用し、格納用のテーブルを定義します。

    cqlshの実行に必要な物としてPythonモジュールであるpytzが必要なため、インストールします。
    以下コマンドを実行。

    $ pip install pytz
    4,Cassandraタイムゾーンの設定

    Cassandraでは、タイムスタンプ型で保持される時刻のタイムゾーンは、UTCとなっているため、
    これを日本標準時に設定します。

    rootディレクトリに 「.cassandra」 というディレクトリを作成し、
    cqlshの設定を行う、cqlshrcというファイルを作成します。

    このファイルはcqlshの設定ファイルとなり、ここでタイムゾーンの設定をします。
    以下コマンドを実行。

    .cassandraディレクトリの生成

    $mkdir ~/.cassandra

    cqlshファイルの生成、編集

    $ vi ~/.cassandra/cqlshrc
    [cqlshrc]		
       [ui]		
        timezone=Asia/Tokyo

    この設定をすれば、自分のサーバーと同じ日本標準時となります。

    5.それではcqlshを起動し、テーブルを定義しましょう

    cqlshコマンドを実行して、cqlshコマンドラインへ接続

    $ cqlsh
    6.テーブルを格納するキースペースの作成
    $ cqlsh> CREATE KEYSPACE vehcle_data WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

    キースペースを定義する際に、キースペースに格納されるデータがどのように複製されるかを
    指定する必要があります。

    今回はreplication_factor に1を指定し、
    データの複製が作成されずクラスタ内に、一つだけ作成されるようにしています。

    7. 作成したキースペースに移動し、sumodata格納用のテーブルの定義を行う

    キースペースへ移動

    $ cqlsh> USE vehicle_data	

    指定したキースペースからテーブルを定義

    $ cqlsh:vehicle_data> CREATE TABLE IF NOT EXISTS vehicle_sensor_data (		
                                           vehicleid int,		
                                           speed int,		
                                           longitude float,		
                                           latitude float,		
                                           vehicletime timestamp,		
                                           PRIMARY KEY (vehicleid, vehicletime)		
                                    WITH CLUSTERING ORDER BY (vehicletime ASC);
      
    

    PRIMARY KEY に、vehicletid と vehicletimeを指定することによって、
    同じvehicleidでもvehicletime が違っていた場合に、違うレコードとして保存することができます。

    秒単位でデータを連携する場合など一意制約違反とならないように、
    キーに「vehicleid」と「vehicletime」の2つを設定しています。

    SparkStreamingを使用し、先ほど定義したテーブルに定期的にデータを投入するアプリを実装します。

    SparkからCassandraDBへの接続に、
    今回は spark-cassandra-connector というライブラリを使用します。
    spark-cassandra-connector 等のSpark用のライブラリは、
    Spark Packages http://spark-packages.org にてダウンロードできます。

    ⑥SparkStreamingを使用してDB格納操作を実装

    それでは、Spark-cassandora-connector を使用してKafkaから送信されるストリームデータを
    Cassandoraに格納していきます。

    SparkStreamingは、RDD単位で処理を行うため、
    Cassandoraに格納する際には、DataFrameに変換する必要があります。
    DataFrameに変換し、DataFrameWriterを使用してCassandoraへデータ格納していきます。

    RDD(Resilient Distributed Dataset)とは

    不変(イミュータブル)で並列実行可能な(分割された)データセットです。
    不変性、耐故障性、分散に優れ、基本的なSparkの処理単位となっています。

    特徴としては、不変性なため処理を行うたびに、現在のRDDに処理を実行することによって
    子のRDDに変換されるという方向性となることです。

    つまり、子のRDDが親のRDDに戻ることがない、一方方向へと処理が流れることになります。

    Sparkでは変換処理自体はDAGに記録されるだけで、
    その後に行うアクション時に記録されたDAG情報から、
    変換処理の最適な実行プランを立ててくれることで、
    不要なデータがメモリ上に残らないように制御出来ます。

    DataFrameとは、

    イメージとして、RDBのテーブルのようなイメージの構造化されたデータセットとなっています。
    構造化データなため、クエリ処理の実行や管理が簡単に実現できます。
    DataFrameを使用するメリットとして、RDDよりもクエリの実行計画を最適化してくれることにより、
    性能が高いというところです。

    Kafkaから定期的にデータを受け取り、Cassandraへ格納する処理を実装します。

    CassandraStorage.py # coding: UTF-8
    import json
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    if __name__ == "__main__":

    spark = SparkSession.builder.appName("CassandoraStorage").getOrCreate()
    spark.sparkContext.setLoglevel("WARN")

    ssc = StreamingContext(spark.context, 1)

    kafkaData = KafkaUtils.createDirectStream(ssc, ["sumo-data"], {"metadata.broker.list": "192.168.33.10:9092"})

    schema = StructType().add("vehicleid",LongType()).add("speed",DoubleType()).add("latitude",DoubleType()).add("longitude",DoubleType()).add("vehicletime",StringType())

    #Cassandra格納用のメソッドを定義
    def storageCassandra(rdd):
    if not rdd.isEmpty():
    dataFrame = rdd.toDF(schema)
    #Cassandraテーブル定義に従って変換
    csDataFrame = dataFrame.withColumn("vehicleTime",fromunixtime(unix_timestamp(dataFrame.vehicleTime,"yyyy/MM/dd HH:mm:ss"))) \
    .select("vehicleid", "speed", "longitude", "latitude", date_format("vehicleTime", "yyyyMMdd").alias("vehicleTime"))
    #vehicle_dataキースペースvehicle_sensor_dataにデータを格納
    csDataFrame.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="vehicle_sensor_data", keyspace="vehicle_data") \
    .sava()

    #Kafkaから取得したRDDを引数にCassandra格納用メソッドを呼び出す
    kafkaData.map(lamdba (k, v): json.loads(v)).foreachRDD(storageCassandra)

    #ストリーミング処理の開始と継続を実行
    ssc.start()
    ssc.awaitTermination()

    それでは作成したプログラムを実行してみましょう。
    作成したCassandraStorage.py を /opt/sparkapplication/ 内に保存し、以下を実行します。

    実行コード
    $ /usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.6 /opt/sparkapplication/CassandraStorage.py

    プログラムを実行し、データがvehicle_sensor_dataテーブル内に格納されているか調べてみましょう。

    $ cqlsh		
    		
    cqlsh> use vehicle_data		
    cqlsh:vehicle_data> SELECT * FROM vehicle_sensor_data;	
      
    

    データが継続的に格納されているのが確認できたら成功です。
    これでセンサーデータを疑似的に作り出し、
    継続的にCassandraデータベースに格納することができました。

    分散環境の考慮として、
    SparkとCassandraの組み合わせは、分散環境によってかなりの効率を作り出します。
    かなり前の日経 xTECH(クロステック)の記事ですが、(2015年くらい)
    Cassandraについて分かりやすく解説してあります。

    https://tech.nikkeibp.co.jp/it/atcl/column/14/090100053/090800083/

    NetFlix等で使用されているSpark-Cassandraですが、
    トラフィックが非常に多い環境でも、Cassandraは無停止を実現できることに強みがあります。

    動画というかなり大きなビックデータを、
    常に送り続けなければならないユースケースで停止することなく
    速く動作させるのは、まさしくSpark-Cassandraの連携がなせる物と言えるでしょう。

    Cassandraというデータベースは、スケールアウトできるのが特徴で、
    開発資金さえかければ(サーバー台数分)無限に分散させることができます。

    ※ただ非効率な分散の仕方をすると、それこそ無限に開発資金の無駄遣いとなってしまいます。

    そしてなんといっても、マスターノード等といった概念が無いため、
    設定次第でデータロストを防ぐことができます。

    4台のサーバーによって分散させている環境を前提とすると、
    サーバーのどれか1台が故障してしまった場合でも、
    データをロストすることなく動作を続けることができ、
    耐障害性、スケーラビリティー、 Spark等の分散処理との併用を求めるのであれば
    CassandraはかなりオススメなDBとなっています。

    長々となりましたが、どれもビッグデータにおいて必要不可欠なものですので、
    少しでも読んでくださった方の助けになれば幸いです。

鈴木 和翔
CSVIT事業部 鈴木 和翔
中途入社1年目。自動運転コネクティッドカーのフレームワーク作成の現場に所属しているためビッグデータについてのブログを書いてみました。