皆さん、こんにちは。
キャスレーコンサルティング ID(インテグレーション&デザイン)部の鈴木(和)です。
現在、自動運転の研究開発プロジェクトに参画しており、
そこで学んだことをもとに
今回はビッグデータ関連の主なツールを使用してブログにしました。
ビッグデータに興味のある方は是非、御覧下さい。
自分がインタビューを受けたブログもありますので、
興味がある方は、コチラもぜひ御覧になってください。
内容
以下の二つの機能を作るまでの内容となっています。
・Fluentdを使用して疑似センサーデータを生成し、
Kafkaメッセージブローカーを使用して、センサーデータを毎秒受け取る機能
・ Kafkaメッセージブローカー経由にて、SparkStreamingからセンサーデータを毎秒受け取り、
分散DBであるCassandraに格納する機能
目次
①環境構築
②Fluentdによるセンサーデータの作成
③Kafkaによるデータ取得
④SparkStreamingを使用してKafkaからメッセージを取得
⑤Cassandraのインストールと起動
⑥SparkStreamingを使用してDB格納操作を実装
※全体イメージ図
主な使用ツールとバージョン
- Vagrant centos-7.2
- td-agent 1.2.2
- Kafka_2.11-1.0.0
- Spark 2.3.1
- Cassandra 3.11.3
- Teraterm
①環境構築
1,VirtualBoxのインストール
http://www.oracle.com/technetwork/jp/server-storage/virtualbox/downloads/index.html
2,Vagrantのインストール
3,VagrantFileの記述
VAGRANTFILE_API_VERSION = "2" Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vm.box = "centos7.2" config.vm.box_url = "https://github.com/CommanderK5/packer-centos-template/releases/download/0.7.2/vagrant-centos-7.2.box" config.vm.network "private_network", ip: "192.168.33.15" config.vm.network "forwarded_port", guest: 8888, host: 8888 config.vm.provider "virtualbox" do |vm| vm.memory = 4096 end end
4,上記の用にVagrantFileを記述したらVagrantを起動する
$ vagrant up Bringing machine 'default' up with 'virtualbox' provider... '==> default: Clearing any previously set forwarded ports... '==> default: Clearing any previously set network interfaces... '==> default: Preparing network interfaces based on configuration... default: Adapter 1: nat default: Adapter 2: hostonly '==> default: Forwarding ports... default: 8888 (guest) => 8888 (host) (adapter 1) default: 22 (guest) => 2222 (host) (adapter 1) '==> default: Running 'pre-boot' VM customizations... '==> default: Booting VM... '==> default: Waiting for machine to boot. This may take a few minutes... default: SSH address: 127.0.0.1:2222 default: SSH username: vagrant default: SSH auth method: private key default: Warning: Connection reset. Retrying... default: Warning: Connection aborted. Retrying... default: Warning: Remote connection disconnect. Retrying... default: Warning: Connection aborted. Retrying... default: Warning: Remote connection disconnect. Retrying... default: Warning: Connection aborted. Retrying... default: Warning: Connection reset. Retrying... default: Warning: Connection aborted. Retrying... default: Warning: Connection reset. Retrying... default: Warning: Connection aborted. Retrying... default: Warning: Connection reset. Retrying... default: Warning: Connection aborted. Retrying... default: Warning: Connection reset. Retrying... '==> default: Machine booted and ready! '==> default: Checking for guest additions in VM... default: The guest additions on this VM do not match the installed version of default: VirtualBox! In most cases this is fine, but in rare cases it can default: prevent things such as shared folders from working properly. If you see default: shared folder errors, please make sure the guest additions within the default: virtual machine match the version of VirtualBox you have installed on default: your host and reload your VM. default: default: Guest Additions Version: 4.3.30 default: VirtualBox Version: 5.2 '==> default: Configuring and enabling network interfaces... default: SSH address: 127.0.0.1:2222 default: SSH username: vagrant default: SSH auth method: private key '==> default: Mounting shared folders... default: /vagrant => C:/Users/PCUser '==> default: Machine already provisioned. Run `vagrant provision` or use the `--provision` '==> default: flag to force provisioning. Provisioners marked to run always will still run.
5,Vagrantの状態を確認
$ vagrant status Current machine states: default running (virtualbox) The VM is running. To stop this VM, you can run `vagrant halt` to shut it down forcefully, or you can run `vagrant suspend` to simply suspend the virtual machine. In either case, to restart it again, simply run `vagrant up`.
Virtualboxの状態が「runnnig」となっていれば、Vagrantが無事起動となります。
6,Vagrantが無事起動したらTeraterm(ターミナルソフト)から接続してログイン
192.168.33.15 に対して接続 ユーザー名 : vagrant パスワード : vagrant
7,必要コンポーネントのインストール
$ sudo yum install -y git git-core java-1.8.0-openjdk epel-release python -pip gcc python-devel libevent-devel
8,Sparkのインストール ※本ブログではSpark2.3.1を使用
以下サイトより、ダウンロードしたいバージョンを選択してください。 https://spark.apache.org/downloads.html
$ sudo su - $ wget https://www-us.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
以下のコマンドでは、取得したファイルを解凍し、Sparkのディレクトリを作成している。
$ tar xvzf spark-2.3.1-bin-hadoop2.7.tgz $ mv spark-2.3.1-bin-hadoop2.7 /usr/local/ $ ln -s /usr/local/spark-2.3.1-bin-hadoop2.7/ /usr/local/spark $ exit
9,次に環境変数の設定
$ export SPARK_HOME=/usr/local/spark $ export PATH=${SPARK_HOME}/bin:${PATH} $ export PYTHONPATH=${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
10,試しにscalaのインタラクティブシャルを起動しましょう
$ spark-shell
11,次にpythonのインタラクティブシャルを起動しましょう
$ pyspark
②Fluentdによるセンサーデータの作成
本ブログでは、SUMOと呼ばれる交通シミュレーションソフトから吐き出される
走行データを基に毎秒ごとのデータを疑似的に発生させ、
分散処理を使用して、CassandraDBへと格納します。SUMO(Simulation of Urban MObility)とは、 道路網を疑似的に交通処理するように設計され、
高度にポータブル顕微鏡と連続道路交通シミュレーションを行うオープンソースパッケージです。シミュレーションを行った車両の1秒ごとの情報が得られ、情報として主に
- vehicleId (車両ID)
- Speed (分/km)
- 緯度
- 経度
- シミュレーション時間 (秒単位)
-
等がxml形式で吐き出されます。(他情報も吐き出されていますが、ここでは割愛します)
詳しいSUMOのシミュレーションについては、以下をご参照下さい。https://qiita.com/nattof/items/d198557126b7d3636f10
SUMOから吐き出された走行データをJSON化し、 Fluentdによって情報収集し、
Kafkaメッセージブローカーへと送信します。Fluentdとは
米Treasure Data社が中心となり、「Log everything in JSON」を掲げ、
すべてのログをJSONとして扱うことを目的として、オープンソースとして開発されている
ログ収集管理ツールです。Rubyで実装されており、データを収集するのにインプットプラグイン、データを保持・移動するときに
バッファプラグイン、データを出力するのにアウトプットプラグインを用います。互換性のあるプラグインを使用することで機能を拡張でき、その組み合わせにより、
さまざまなログデータの活用方法を変えることが出来る非常に便利なシステムです。今回はFluentdを使って、簡単に車両走行センサーデータの取得を実現しましょう。
1,td-agentのインストール
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh
2,td-agentの起動
$ systemctl start td-agent
3.td-agentの起動確認
$ systemctl status td-agent
4.configの設定(configファイルを編集する)
$ vi /etc/td-agent/td-agent.conf
configファイルの変更前と変更後になります。主に、項目を追加しています。
使用した各タグの役割について、まとめました。
他にも様々なパラメーターがあり、プラグインによっても様々な設定をすることができます。
こちらの公式ページに、Fluentdのパラメーターの一覧やどんな意味を持つものか等が
こちらに記載されているので、Fluentdに興味があればご参照ください。https://docs.fluentd.org/v1.0/articles/plugin-common-parameters
これでFluentdの設定は、完了となります。
③Kafkaによるデータ取得
Apache Kafkaとは
Apache Kafkaは、2011年にLinkedInから公開された
オープンソースの分散メッセージングシステムです。Kafkaは分散させることによって、大容量のデータ(ログやイベント等)を
高スループット且つ低レイテンシで、収集/配信することが出来ます。そして、「Apache ZooKeeper」を動作させることによって、
複数のKafkaサーバを連携させて、クラスタ化します。
クラスタ化により耐障害性/高可用性を実現します。
メッセージはファイルに保存され、クラスタ内でレプリカが作成されるため
データの損失を防ぐ事が出来ます。
(ログなども複数のサーバーから、どこか一箇所にまとめておく設定なども可能)そしてなんといってもスケーラブル性にすぐれており、
ダウンタイムを必要とせず拡張することができます。今回は、そんなApache Kafkaをシングルノードで動作させ、メッセージを収集/配信させます。
Kafkaへデータを送信する側はProducerと呼ばれ、データを受信する側をConsumerと呼びます。
Topicsとは、メッセージ連携するためのトンネルの様なイメージです。
Brokerはクラスタを構成し、ProducerとConsumerの間でメッセージの受け渡しを行う
キューとして動作します。「Producer」 「Consumer」 「KafkaBroker」 「ZooKeeper」 の考え方のイメージを図示してみました。
(messageを今回はSUMOデータとして図示しています)1,Kafkaのインストール
$ wget -O /tmp/kafka_2.11-1.0.0.tgz http://ftp.riken.jp/net/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz $ tar -xzf /tmp/kafka_2.11-1.0.0.tgz -C /opt $ mv /opt/kafka_2.11-1.0.0 /opt/kafka
2.ZooKeeperの起動
$ nohup /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties &
Zookeeperについて(Wikipedia引用)
Apache ZooKeeperは、Apacheソフトウェア財団のオープンソースプロジェクトで、
大規模分散システムでよく利用される、設定情報の集中管理や名前付けなどのサービスを
提供するソフトウェアです。最初は、Hadoopのサブプロジェクトの一つでしたが、
現在はトップレベルプロジェクトの一つになっています。ZooKeeperのアーキテクチャでは、高可用性を冗長サービスにより提供している。
つまり、クライアントはあるZooKeeperノードへの問い合わせが失敗したら、
他のノードに問い合わせることができます。データの更新は一つのマスターノードだけが行うようになっているので、
データがノード間で矛盾した内容になることはありません。
(ただし、最新のデータでない可能性はある)。更新を担当するマスターノードが、何らかの理由で停止した場合には、
各ノード間で選挙を行い、新たな更新ノードが選ばれます。ZooKeeperは、データを階層的な名前空間に保存しているが、
これはファイルシステムや、トライ木のデータ構造によく似ています。
クライアントは、このノードに読み書きを行うことによって、
設定情報共有などのサービスを提供してくれます。ZooKeeperは、「en:Rackspace」や「Yahoo!」などの企業で、
「Solr」のようなオープンソースのエンタープライズサーチシステムで、使用されています。それでは、ZooKeeperを起動したところで、次にkafkaの起動を行います。
3.Kafkaの起動
$ nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &
4.Topicの作成
$ /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sumo-data Created topic "sumo-data".
Created topic “sumo-data” と表示されれば、成功です。
5,作成したTopicがどのように配置されているかの確認
$ /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 sumo-data
6. パーティション構成の確認
$ /opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic sumo-data
7. 試しにkafka to kafka にてメッセージ送信をしてみる
$ /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sumo-data
※コマンド実行すると、プロンプトが入力待ち状態となります。
この状態で何か文字を打ち込むと、Kafka Brokerへメッセージを送信することができます。
例として、文字を入力してみましょう。$ /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sumo-data >CasleyConsulting
Producer(送信側)から、「CasleyConsulting」と文字を打ち込みました。
8番目の項目で、受信してみましょう。Kafkaを使用するにあたり、考えなければいけないことが少なくとも2つあります。
それは、メッセージの保証度と分散のさせ方です。メッセージの保証度に関して、☆acks 分散のさせ方については ☆Partitioner があり、
それぞれ簡単にですが説明いたします。☆acks
Kafkaにメッセージを送信した場合には、
Broker側がメッセージ受信を成功したとする条件を、設定することができます。
それが、acksというパラメーターです。acksにどのような値を設定すると、どのような動作になり、
メッセージ受信のどのタイミングで受信成功とするかを、簡単にまとめてみました。acks = 0 :
Producer側がメッセージを送信したとするタイミングで送信の成功とします。
つまり、Producer側がメッセージをソケットバッファの書き込みしたとするタイミングで
送信完了とするため、本当に書き込めているか分かりません。
逆に言えば、送信完了のタイミングが早いため、低レイテンシと言えます。
acks = 1 :
Producer側のメッセージ送信後、
KafkaBrokerに存在するリーダーパーティションへの送信完了時に送信完了とします。
レプリカパーティションへの同期を待たず、
送信完了とするため仮に送信完了時のタイミングで
リーダーパーティションノードへの障害が発生した場合にメッセージロストへ繋がります。
acks = all :
Producer側のメッセージ送信後、
レプリカパーティションまでの送信完了時に送信完了とします。
メッセージの保証度は高くなりますが、
一つ一つのメッセージの送信完了時間が伸びるため、
レイテンシがかなり遅くなります。ここからは個人の意見ですが、Kafkaを使用するということは、
ほとんどビックデータを扱うということなので、
メッセージ一つ一つの保証度はかなり低くても、量でカバーするイメージです。
つまりビックデータというものに関して言えばこの設定をする場合は、
ほとんど少ないのではないでしょうか。☆Partitioner
Partitionerを特に指定せずにメッセージをProduceすると、デフォルトのPartitionerが使用され、
KafkaPartitionにランダムでメッセージが振り分けられます。ラウンドロビン以外に、ID毎や何かの種類毎にメッセージを振り分けることによって、
Consumeした際に指定した値ごとのまとまったメッセージを取得することができます。注意点として、定期的なセンサーデータを例とした場合に、
KafkaのいずれかのPartitionにメッセージが偏ってしまうと、
分散している意味があまりなくなってしまいますので、
あくまで均等に分散を考えながらPartitionerをカスタムする事が重要です。今回のユースケースでは分散は行わず、シングルノードにて処理を行うため、
デフォルトPartitionerを使用します。8.メッセージを受信できているかの確認
$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sumo-data --from-beginning
コマンドを打ち込み、先ほど打ち込んだ「CasleyConsulting」という文字列が受け取れれば成功です。
・Fluentd と Kafkaを連携させ、定期的なセンサデータの投入を行う
今までの設定で、既にFluentdからKafkaへの連携を実現することができます。
/var/log/sumo_data/sumo_data.log 内に1秒ごとに、JSON形式のデータを投入することによって
自動でFluentdがメッセージを読み出し、Kafkaへと連携してくれます。試しに、 /var/log/sumo_data/sumo_data.log にJSONデータを記載してみましょう。
例) {“vehicleid”:00001,”speed”:40.9,”longitude”:35.642259,”latitude”:139.713458,”vehicletime”:”2018/11/04 16:42:01″}
記載したら、Kafkaにてメッセージが受信できていることを確認します。
$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sumo-data --from-beginning
無事受信出来ていることを確認できれば、成功です。
これでようやく、センサーデータ実現の環境構築が終わりました。
次に、SparkStreamingを使ってデータ格納をしていきます。④SparkStreamingを使用してKafkaからメッセージを取得
・SparkStreamingを使用してKafkaからセンサデータを取得し、
コンソールに表示させるサンプルアプリを作成以下のサンプルアプリを、pythonにて実装してみましょう。
# coding: UTF-8
from pyspark.sql import SparkSessionif __name__ == "__main__":
# SparkSessionの作成 (kafkaConsoleTest)
spark = SparkSession.builder.appName("kafkaConsoleTest").getOrCreate
spark.sparkContext.setLogLevel("WARN")# sumo-data トピックからメッセージを取得する
kafkaData = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.33.10:9092").option("subscribe", "sumo-data").load()# 文字列データとして取得する
kafkaDataString = kafkaData.selectExpr("CAST(value AS STRING) as value")# 変換した文字列をコンソール出力させる.
query = kafkaDataString.writeStream.outputMode("append").format("console").start()# 継続的に処理を実行する
query.awaitTermination()ファイル名を KafkaConsoleTest.pyとし、/opt/sparkapplication/ 内に保存したとして起動させる。
$ /usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 /opt/sparkapplication/kafkaConsoleTest.py
起動中に、Terminalを多重起動させ、
/var/log/sumo_data/sumo_data.log にJSON形式のサンプルデータを記載する。Fluentdが自動的に読み込み記載したサンプルデータを取得し、Kafkaへと流れる。
送信されたサンプルデータが、先ほど起動したSparkStreamingテストアプリケーションによって
取得できていれば成功です。それでは、SparkStreaming と Cassandraデータベースを連携させ、
ストリーミングでデータを格納する、アプリケーションの実装をする。(青枠部分の実装)
⑤Cassandraのインストールと起動
1.Cassandraをパッケージとしてインストール
/etc/yum.repos.d/cassandra.repo ファイルを作成
cassandra.repo [cassandra] name=Apache Cassandra baseurl=https://www.apache.org/dist/cassandra/redhat/311x/ gpgcheck=1 gpgkey=https://www.apache.org/dist/cassandra/KEYS
$ yum install -y cassandra
実行するとCassandraがインストールされる。
$ systemctl start cassandra
2,cassandraが起動しているか確認
$ systemctl start cassandra
3.pytzのインストール
Cassandraには、CQLを対話的に実行するcqlshというツールがあります。
(postgreSQLでいうところのpsql)
これを使用し、格納用のテーブルを定義します。cqlshの実行に必要な物としてPythonモジュールであるpytzが必要なため、インストールします。
以下コマンドを実行。$ pip install pytz
4,Cassandraタイムゾーンの設定
Cassandraでは、タイムスタンプ型で保持される時刻のタイムゾーンは、UTCとなっているため、
これを日本標準時に設定します。rootディレクトリに 「.cassandra」 というディレクトリを作成し、
cqlshの設定を行う、cqlshrcというファイルを作成します。このファイルはcqlshの設定ファイルとなり、ここでタイムゾーンの設定をします。
以下コマンドを実行。.cassandraディレクトリの生成
$mkdir ~/.cassandra
cqlshファイルの生成、編集
$ vi ~/.cassandra/cqlshrc
[cqlshrc] [ui] timezone=Asia/Tokyo
この設定をすれば、自分のサーバーと同じ日本標準時となります。
5.それではcqlshを起動し、テーブルを定義しましょう
cqlshコマンドを実行して、cqlshコマンドラインへ接続
$ cqlsh
6.テーブルを格納するキースペースの作成
$ cqlsh> CREATE KEYSPACE vehcle_data WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
キースペースを定義する際に、キースペースに格納されるデータがどのように複製されるかを
指定する必要があります。今回はreplication_factor に1を指定し、
データの複製が作成されずクラスタ内に、一つだけ作成されるようにしています。7. 作成したキースペースに移動し、sumodata格納用のテーブルの定義を行う
キースペースへ移動
$ cqlsh> USE vehicle_data
指定したキースペースからテーブルを定義
$ cqlsh:vehicle_data> CREATE TABLE IF NOT EXISTS vehicle_sensor_data ( vehicleid int, speed int, longitude float, latitude float, vehicletime timestamp, PRIMARY KEY (vehicleid, vehicletime) WITH CLUSTERING ORDER BY (vehicletime ASC);
PRIMARY KEY に、vehicletid と vehicletimeを指定することによって、
同じvehicleidでもvehicletime が違っていた場合に、違うレコードとして保存することができます。秒単位でデータを連携する場合など一意制約違反とならないように、
キーに「vehicleid」と「vehicletime」の2つを設定しています。SparkStreamingを使用し、先ほど定義したテーブルに定期的にデータを投入するアプリを実装します。
SparkからCassandraDBへの接続に、
今回は spark-cassandra-connector というライブラリを使用します。
spark-cassandra-connector 等のSpark用のライブラリは、
Spark Packages http://spark-packages.org にてダウンロードできます。⑥SparkStreamingを使用してDB格納操作を実装
それでは、Spark-cassandora-connector を使用してKafkaから送信されるストリームデータを
Cassandoraに格納していきます。SparkStreamingは、RDD単位で処理を行うため、
Cassandoraに格納する際には、DataFrameに変換する必要があります。
DataFrameに変換し、DataFrameWriterを使用してCassandoraへデータ格納していきます。RDD(Resilient Distributed Dataset)とは
不変(イミュータブル)で並列実行可能な(分割された)データセットです。
不変性、耐故障性、分散に優れ、基本的なSparkの処理単位となっています。特徴としては、不変性なため処理を行うたびに、現在のRDDに処理を実行することによって
子のRDDに変換されるという方向性となることです。つまり、子のRDDが親のRDDに戻ることがない、一方方向へと処理が流れることになります。
Sparkでは変換処理自体はDAGに記録されるだけで、
その後に行うアクション時に記録されたDAG情報から、
変換処理の最適な実行プランを立ててくれることで、
不要なデータがメモリ上に残らないように制御出来ます。DataFrameとは、
イメージとして、RDBのテーブルのようなイメージの構造化されたデータセットとなっています。
構造化データなため、クエリ処理の実行や管理が簡単に実現できます。
DataFrameを使用するメリットとして、RDDよりもクエリの実行計画を最適化してくれることにより、
性能が高いというところです。Kafkaから定期的にデータを受け取り、Cassandraへ格納する処理を実装します。
CassandraStorage.py
# coding: UTF-8
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilsif __name__ == "__main__":
spark = SparkSession.builder.appName("CassandoraStorage").getOrCreate()
spark.sparkContext.setLoglevel("WARN")ssc = StreamingContext(spark.context, 1)
kafkaData = KafkaUtils.createDirectStream(ssc, ["sumo-data"], {"metadata.broker.list": "192.168.33.10:9092"})
schema = StructType().add("vehicleid",LongType()).add("speed",DoubleType()).add("latitude",DoubleType()).add("longitude",DoubleType()).add("vehicletime",StringType())
#Cassandra格納用のメソッドを定義
def storageCassandra(rdd):
if not rdd.isEmpty():
dataFrame = rdd.toDF(schema)
#Cassandraテーブル定義に従って変換
csDataFrame = dataFrame.withColumn("vehicleTime",fromunixtime(unix_timestamp(dataFrame.vehicleTime,"yyyy/MM/dd HH:mm:ss"))) \
.select("vehicleid", "speed", "longitude", "latitude", date_format("vehicleTime", "yyyyMMdd").alias("vehicleTime"))
#vehicle_dataキースペースvehicle_sensor_dataにデータを格納
csDataFrame.write \
.format("org.apache.spark.sql.cassandra") \
.mode("append") \
.options(table="vehicle_sensor_data", keyspace="vehicle_data") \
.sava()#Kafkaから取得したRDDを引数にCassandra格納用メソッドを呼び出す
kafkaData.map(lamdba (k, v): json.loads(v)).foreachRDD(storageCassandra)#ストリーミング処理の開始と継続を実行
ssc.start()
ssc.awaitTermination()それでは作成したプログラムを実行してみましょう。
作成したCassandraStorage.py を /opt/sparkapplication/ 内に保存し、以下を実行します。実行コード
$ /usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.6 /opt/sparkapplication/CassandraStorage.py
プログラムを実行し、データがvehicle_sensor_dataテーブル内に格納されているか調べてみましょう。
$ cqlsh cqlsh> use vehicle_data cqlsh:vehicle_data> SELECT * FROM vehicle_sensor_data;
データが継続的に格納されているのが確認できたら成功です。
これでセンサーデータを疑似的に作り出し、
継続的にCassandraデータベースに格納することができました。分散環境の考慮として、
SparkとCassandraの組み合わせは、分散環境によってかなりの効率を作り出します。
かなり前の日経 xTECH(クロステック)の記事ですが、(2015年くらい)
Cassandraについて分かりやすく解説してあります。https://tech.nikkeibp.co.jp/it/atcl/column/14/090100053/090800083/
NetFlix等で使用されているSpark-Cassandraですが、
トラフィックが非常に多い環境でも、Cassandraは無停止を実現できることに強みがあります。動画というかなり大きなビックデータを、
常に送り続けなければならないユースケースで停止することなく
速く動作させるのは、まさしくSpark-Cassandraの連携がなせる物と言えるでしょう。Cassandraというデータベースは、スケールアウトできるのが特徴で、
開発資金さえかければ(サーバー台数分)無限に分散させることができます。※ただ非効率な分散の仕方をすると、それこそ無限に開発資金の無駄遣いとなってしまいます。
そしてなんといっても、マスターノード等といった概念が無いため、
設定次第でデータロストを防ぐことができます。4台のサーバーによって分散させている環境を前提とすると、
サーバーのどれか1台が故障してしまった場合でも、
データをロストすることなく動作を続けることができ、
耐障害性、スケーラビリティー、 Spark等の分散処理との併用を求めるのであれば
CassandraはかなりオススメなDBとなっています。長々となりましたが、どれもビッグデータにおいて必要不可欠なものですので、
少しでも読んでくださった方の助けになれば幸いです。