こんにちは。SI部の遠山です。
業務でBigQueryに触る機会があったので、自身の整理も兼ねて入門記事を書いてみようと思います。
目次は、

  1. BigQueryとは
  2. 準備
  3. CLIで実行してみる
  4. APIライブラリから実行してみる

という流れで書いていきます。

1.BigQueryとは

BigQueryはGoogleのフルマネージドのクラウドサービスです。
その特徴は、分散処理により、①大量の構造化データに対してSQLクエリを、②超速に実行出来るという点です。
ビッグデータの分散処理と言うと、MapReduceが有名です。MapReduceはGoogleが技術論文として公表したものであり、その実装としてApachがオープンソースのHadoopを開発し、普及しました。
使用が広がるにつれてMapReduceの問題点としていくつかの物が挙げられるようになり、それに対応して様々なフレームワークや改善策が開発されてきました。例えば、

・データに対する問い合わせの内容を変える毎にJavaコードを書かなければならず、また工数がかかる
=> 処理をSQLライクに書く事によってMapReduceジョブに変換してくれるpigやhiveといったツール。
・処理毎にHDFSに中間データのIOが発生するので繰り返し実行やアドホックな実行に向かない
=> メモリキャッシュを利用するSpark。

まだまだありますがそれぞれだけで記事が書けそうな情報量です。
一方GoogleはMapReduce周辺の概念をBig Data Stack 1.0として、その上にBig Data Stack 2.0と呼ばれるインフラのコンポーネント群を追加しました。

・Colossus
=> 分散ファイルシステムだが、内容は公開されていない。
・Spanner
=> 地球規模でレプリケーションするデータストアで、原子時計で同期して地理間の時間順序問題を解決している。
・FlumeJava
=> ビッグデータコレクションに対して慣用句的なJavaコードを書けるシステム。MapReduceとして変換・最適化されて実行される。
・Dremel
=> Colossusなどに保存されているデータに対してクエリ実行出来る、分散SQLエンジン。

すなわち分散・並列処理をする上で、処理そのものの質を高める内側の方向を前提として、その規模を広げる方向への改善が2.0と言えるのかと思いました。
BigQueryはこのStack2.0の上に構築されたサービスであり、SQLエンジンとしてDremelを使用しています。
Dremelはこれらの地球規模の分散環境を利用して、数千台のディスクから並列にデータを読み込む事が出来ます。
またこれらのデータはColumnIOと呼ばれるフォーマットで保存されています。ColumnIOはその名前の通り列指向でデータを保持するもので、クエリで必要となる列だけを選択出来る事と、列内のデータの類似性による高い圧縮率によって、高速化を実現しています。
以上のようなGoogleならではの巨大な分散環境と、列指向のデータフォーマットによって高速なクエリ実行を可能にしているそうです。

前述したMapReduceから始まるフレームワーク群とBigQueryを比較すると、目的による使い分けになると思います。
ジョブの詳細なコーディングが必要とされるMapReduceは裏を返せば複雑な変換処理を大規模に行えるということです。スピードの要求されないバッチなどにはこちらが最適でしょう。
反対に、SQLによって表現出来る処理をアドホックに行いたいならBigQueryが役立つでしょう。

コストの問題も見過ごせません。BigQueryはColumnIOの名の通り、クエリ課金も列で読んだデータ量で計算されます。安いと言われるBigQueryですが、実行するクエリと使用するデータの形によってはコストが跳ね上がる事もあります。例えばクエリに使用する列が長い文字列になる場合などです。コストが上がる上にスピードがさほど要求されないようなケースでは既存のフレームワークの方が正解になる場合も十分あり得ます。

2.準備

さて前置きが長くなりましたが、実際に実行してみようと思います。
BigQueryを実行するにはGoogleアカウントと課金情報の登録が必要です。
ただ現在Googleのクラウドサービスには60日間+3万円分の無料体験があり、安心して試す事が出来ます。

まずGoogleのsdkページでgoogle-cloud-sdkをインストールする必要があります。
→GoogleCloudSDK(https://cloud.google.com/sdk/)
インストールの方法もこのページにあります。
インストールが終了したらコマンドが使用可能になりますので、

 gcloud auth login

とコマンド入力して認証に進みます。ブラウザが使える環境なら勝手にブラウザが立ち上がり、アカウント選択画面になります。

gcloudauth

開発者アカウントとして使う物を選択してください。

authcomp

これで登録は終了です。

画面右上のmy consoleをクリックするとdeveloper consoleに移動出来ます。
BigQueryは左下「ビッグデータ」欄にあり、クリックするとWebUIが現れます。

webui

新しいprojectやdatasetを設定する時はWebUIが分かりやすくて良いと思います。
クエリ実行もここで出来るので何が出来て何が出来ないのか雰囲気をつかむのに良いです。
ちなみに左下のpublic:samplesはサンプルデータなので、最初から使う事が出来ます。
左上のCOMPOSE QUERYからサンプルデータにクエリ実行してみます。
wikipediaの更新履歴データ約3億件を対象にタイトルとコメントをselectしてみます。

webuirunquery

BigQueryのデータ階層はproject→dataset→tableとなっています。クエリ実行時に使うテーブルはprojectが選択されている事を前提として、dataset名.table名といった指定になるのでSSのようなFROM句になります。
2.5秒で完了し、18GBをシークした旨が表示されています。
簡単なクエリですが3億件を対象にを2.5秒で終了ですので、早さの一端が伺えますね。

WebUIでも十分色々な事が出来ますが、自動化や処理に組み込む事を考えると、CLIやコードからの実行がしたくなります。以降ではそれを試してみようと思います。

3.CLIで実行してみる

準備で使用したCLI環境をそのまま使用出来ます。
google-cloud-sdkにはBigQuery実行コマンドも含まれており、bqから始まるコマンドがそれにあたります。とりあえず、

 bq ls

と入力してプロジェクトが表示出来ればうまく認証されています。
CLI環境で使用するprojectを固定するためのコマンドを入力します。

 gcloud config set project YOUR_PROJECT_ID

でプロジェクトセット完了です。

さてCLIの使い方ですが、公式のCLIページ(→https://cloud.google.com/bigquery/bq-command-line-tool)は網羅的ではないので、CLIのhelpを見るのがベストだと思います。

 bq help

とすれば使えるコマンド一覧で表示されます。さらに表示されたコマンド毎の詳細も表示出来ます。例えばqueryコマンドの詳細を知りたいなら

 bq help query

と入力します。使用出来るオプションなどが一覧で表示出来ます。
それではさきほどのクエリをCLIから実行してみます。
(※この先のSSなのですが、表全体を表示するために大変文字が小さくなっています。申し訳ないのですが拡大するなどしてご覧ください)

bqcliquery

実行出来ました。結果は標準出力されています。

次にオプションをつけて実行してみます。次の例ではジョブIDを振って(自分でIDを振らない場合は自動でランダムな文字列になります)実行しつつ、実行結果を別テーブルに保存します。しかし保存先のtableが無いのでまずはdatasetとtable作成コマンドから実行してみましょう。

bqcli-dataset-table

tableを作成する時はschemaを設定します(schemaが長い場合はjson形式などのファイルにして読み込ませる事で指定する事も出来ます)。
successfullyに作成出来たので、結果を保存するオプションをつけつつ実行してみます。

bqcliequery-table

WebUIで確認してみましょう。

bqcli-resultonwebui

ちゃんと保存されているようです。

他にもバルクロードや、テーブルデータのエクスポートなどもCLIから行う事が出来ますので、シェルスクリプトと組み合わせるなどすれば色々な操作が出来そうです。簡単にコマンドだけ紹介しておきます。

バルクロード

bqload

バルクロードしたデータをそのまま新テーブルとします。そのため引数にテーブル名とスキーマを渡しています。

エクスポート

bqextract

エクスポートはGoogleCloudStorageに向けて行いますので、バケット名を渡しています。

実行時間を見てもらえれば分かる通り、案外長くかかります。リアルタイムに結果が欲しいケースでは使いにくい速度です(ネット環境にもよると思います)。
一応ロードに関しては手元→BQとアップロードするのではなく、予めGoogleCloudStorageにアップしておきGCS→BQとロードすると早くはなるようです。

CLIでのクエリ実行結果をそのままファイルとして手元にダウンロードする機能は調べた限りありません。一応標準出力の形式をcsvやjsonに整形するオプションはありますので、リダイレクトと各種コマンドで出来ない事は無いですが、結果の行が大きい場合はクエリ結果を一旦テーブルとして保存し、そのテーブルをGoogleCloudStorageにエクスポートしてから手元にダウンロードするという手段になります。そういった場合に上記のエクスポート機能が使えます。

最後に、APIのClientLibraryを使ってコードから実行してみます。

4.コードから実行してみる

BigQueryにデータをロードする場合はGoogleCloudStorageからや、手元からのアップロードに加えて、ストリームとして挿入する機能もあります(→https://cloud.google.com/bigquery/streaming-data-into-bigquery)。

今回は信頼と実績のfluentdを使ってこの機能を試しつつ、コードからクエリ実行をしてみようと思います。

td-agent2(≒fluentd)
+ fluent-plugin-twitter (0.4.1)
+ fluent-plugin-bigquery(0.2.11)

virtual box上のCentOSで上記を動作させました。
本筋では無いのでtd-agentの詳細は省きますが、inputとしてtwitter上のツイートをひたすら拾って、outputとしてBigQueryにinsertするというプラグインの組み合わせです。
この設定でtd-agentを実行する事により常時データがBigQueryにinsertされている状態になります。
一方実行側は、ScalaとGoogle Api Client Library(for Java)を使って以下のようにコーディングしました。

TweetAggregationExample.scala

package jp.co.casleyconsulting
import java.io.{FileWriter, PrintWriter}
import scala.util.{Failure, Success}
import collection.JavaConversions._
import jp.co.casleyconsulting.constants.Value._

object TweetAggregationExample {

  def main(args: Array[String]) {

    //BigQueryClient取得
    val bigquery = GoogleServiceClient.getClient match {
      case Success(v) =>
        println("Creating-BQ-client succeeded.")
        v
      case Failure(e) =>
        e.printStackTrace
        println("Creating-BQ-client failed.")
        sys.exit(1)
    }

    //Query実行
    val resultRows = QueryJobHandler.executeQuery(bigquery, getQueryString(args(1).split(","))) match {
      case Success(v) =>
        println("Query-executing Succeeded.")
        v
      case Failure(e) =>
        e.printStackTrace
        println("Query-executing failed.")
        sys.exit(1)
    }

    //結果をファイルに出力
    val writer = new PrintWriter(new FileWriter(RESULT_PATH, true))
    resultRows.map(row =>
      writer.println(row.getF.map(cell => cell.getV).mkString(",") + s",${args(0)}")
    )
    writer.close
  }

}

GoogleServiceClient.scala

package jp.co.casleyconsulting
import java.io._
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import com.google.api.client.http.HttpTransport
import com.google.api.client.http.javanet.NetHttpTransport
import com.google.api.client.json.JsonFactory
import com.google.api.client.json.jackson2.JacksonFactory
import com.google.api.services.bigquery.{Bigquery, BigqueryScopes}
import com.google.common.collect._
import jp.co.casleyconsulting.constants.Value._
import scala.util.{Try}

object GoogleServiceClient {

  private val TRANSPORT: HttpTransport = new NetHttpTransport
  private val JSON_FACTORY: JsonFactory = new JacksonFactory

  def getClient: Try[Bigquery] = Try {
    //p12ファイルで認証してクライアント作成して返す
    val credentials = new GoogleCredential.Builder()
      .setTransport(TRANSPORT)
      .setJsonFactory(JSON_FACTORY)
      .setServiceAccountId(SERVICE_ADDRESS)
      .setServiceAccountScopes(ImmutableList.of(BigqueryScopes.BIGQUERY))
      .setServiceAccountPrivateKeyFromP12File(new File(P12PATH)).build

    new Bigquery.Builder(TRANSPORT, JSON_FACTORY, credentials)
      .setApplicationName("TweetAggregationExample")
      .setHttpRequestInitializer(credentials)
      .build

  }
}

QueryJobHandler.scala

package jp.co.casleyconsulting

import com.google.api.services.bigquery.Bigquery
import com.google.api.services.bigquery.model._
import jp.co.casleyconsulting.constants.Value._
import collection.JavaConversions._
import scala.util.Try

object QueryJobHandler {
  def executeQuery(bigquery: Bigquery, queryString: String): Try[Seq[TableRow]] = Try {
    //クエリジョブにタイムリミットを設定して実行する
    val queryJob = bigquery.jobs.query(
      PROJECT_ID,
      new QueryRequest().setTimeoutMs(TIMEOUT_LIMIT * 1000L).setQuery(queryString)
    ).execute

    //クエリジョブ結果を参照して結果Rowsを返す
    bigquery.jobs.getQueryResults(
      queryJob.getJobReference.getProjectId,
      queryJob.getJobReference.getJobId).execute.getRows.toSeq
  }
}

Value.scala

package jp.co.casleyconsulting.constants

object Value {
  val PROJECT_ID: String = "MY_PROJECT_ID"
  val SERVICE_ADDRESS: String = "MY_SERVICE_ACCOUNT_ADDRESS"
  val P12PATH: String = "MY_P12KEY_PATH"
  val RESULT_PATH: String = "OUTPUT_PATH"
  val TIMEOUT_LIMIT = 20

  def getQueryString(words: Array[String]): String = {
    s"""SELECT sum(word1), sum(word2), sum(word3), sum(word4) FROM
       (SELECT INTEGER(count(1)) AS word1, 0 AS word2, 0 AS word3, 0 AS word4 FROM [bq_test.stream_data@-600000--1] WHERE message CONTAINS('${words(0)}')),
       (SELECT 0 AS word1, INTEGER(count(1)) AS word2, 0 AS word3, 0 AS word4 FROM [bq_test.stream_data@-600000--1] WHERE message CONTAINS('${words(1)}')),
       (SELECT 0 AS word1, 0 AS word2, INTEGER(count(1)) AS word3, 0 AS word4 FROM [bq_test.stream_data@-600000--1] WHERE message CONTAINS('${words(2)}')),
       (SELECT 0 AS word1, 0 AS word2, 0 AS word3, INTEGER(count(1)) AS word4 FROM [bq_test.stream_data@-600000--1] WHERE message CONTAINS('${words(3)}'))"""
  }
}

実行するだけのコードなので流れはとても単純です。GoogleServiceClientクラスで認証を行い、認可されたクライアントを返し、そのクライアントを使ってQueryJobHandlerクラスでクエリを実行して、結果のコレクションを返します。
(※今回はP12keyファイルを使っています。これは認証用の鍵ファイルで、developer consoleから、APIと認証->認証情報->認証情報を追加->サービスアカウントと進めば取得出来ます。その時にサービスアカウント用のメルアドも発行されます。ClientLibraryなどで使うアドレスはこちらの物で、Googleアカウントのメールアドレスとは別です。)

結果のコレクションの中身は以下のようになっています。
List[TableRow]・・・(結果)
TableRow.getF -> List[TableCell]・・・(1行に対してgetFするとセルのコレクションとして取得出来る)
TableCell.getV -> 値・・・(1セルに対してgetVすると具体的な値が取得出来る)

これらの値を分解して、実行時刻をくっつけて、最終的にcsvとして出力しています。
クエリそのものは定数クラスに書いてある通りです。メインに渡したカウントしたいワード4つをツイートから抽出します。td-agentによってツイートデータがBigQuery側に保存されている(FROM句のstream_dataがそのテーブルです)ので、それに対して上記クエリを実行します。
そして今回はFROM句に細工があります。
[bq_test.stream_data@-600000–1]
この、「@-600000–1]が何かというと、BigQueryのテーブルデコレーターという機能です。
これはテーブルにデータがインサートされた時間をミリ秒単位して指定して、その部分にだけクエリを実行するという機能になります。
[-600000–1]は、[-600000 から -1]という意味です。つまり10分前〜1ミリ秒前にstream_dataテーブルにインサートされたデータのみを対象としてクエリ実行します(これは相対指定ですが絶対指定も出来ます)。
なぜこのような指定を試したかというと、折角リアルタイム処理に適したstream-insert機能なので、10分毎の速報値を取ってみようと思ったからです。
ちなみにリアルタイムに拘らずに時間別のツイート数が知りたいのであれば、元々TwitterAPIに呟いた時間の出力がありますので、このようにする必要はありません(笑)。あくまで例という事で。
検索ワードはなんでも良いのですが、時系列的に変化しそうな物が良いと思うので、”日曜” “寒い” “終電” “これから”にしてみます。
それではシェルでラップして10分毎のcron実行で一晩放置してみようと思います。

ZZZ…

さて一晩置いて結果はどのようになったでしょうか。

bqlog

ログを見ると全て正常終了しているようです。

結果のcsvを見てみたのですが、「これから」のツイートが多すぎます。「これから〜行く」などの予定を表す意味の使い方がだんだんと減るのではと思ったのですが、「これからもよろしく」などの用法も拾ってしまいますね。単語の選択がよろしく無かったです。

というわけで残り3つのワードについてグラフにしてみました。

bqstreamresult

結果を見てみると日付変更のあたりで「日曜」の呟きが増えているのが分かります。「終電」「寒い」は案外増えていないですね。

「終電」はもっと増えそうなものなのですが、そもそも手軽に試せるレベルではTwitterのStreamAPIで取得出来る件数がそれほど多く無い(一晩で30万件程度でした)ので、グラフでよく分かるほどの増加は無いという事でしょうか。
ゴールデンタイムに芸能人の名前やテレビ番組名でやってみると数も多くなりそうで面白いかも知れないですね。
いずれにせよBigQueryのStreaming-insertとそれに対するコードからの実行を試せたので入門としては良かったかなと思いました。

最後に

さて長々書いてきましたが、まだまだ心残りもあります。特にコードからの実行は出来る事が多いです。今回のクエリ実行の方法は最も手軽な方法です。Job(JobはAPI上で抽象的な概念で、クエリだけでなくロードやテーブル操作も含む)に設定内容をベタベタ貼り付けて1から作る方法もあります。そのようにクエリジョブを作った場合は色々なオプションや状態追跡が出来るので柔軟な実行が可能です。
そちらも試したかったのですが、残念ながらブログには間に合いませんでした。引き続き無料期間の終了までに試したいと思います。
最初にも言いましたが、無料期間が60日と長く3万円分を使い切る事も無いと思いますので、皆さんもぜひ体験してみてください。
それではここまで読んでいただきありがとうございました。