メモ: AWS Black Belt Tech Webinar「Amazon Kinesis」

最近、2年くらいブランクのあるAWSをキャッチアップしようとしています。今夜は、Amazon Kinesisがテーマのwebinerがあることに気づいたので聴いてみました。Kinesisって例のヘンな形したキーボードのことじゃなかったんですね。

5分くらい遅刻しましたが、講師(榎並さん)のお話をメモしたので、アップしときます。

Kinesisとは

Amazon Kinesisは、大規模なデータを投入してリアルタイムにストリーム処理したいときに使うサービス。現在public betaで、バージニア北部リージョンだけ利用できる模様。

  • StreamとShard
    • 用途単位の土管がStream
    • その中のバケツがShard
    • Streamには必ず1つ以上のShardが含まれる
  • Shardの数でスケール制御
  • CloudWatchで監視できる
  • Shardの利用時間とputされたデータ数に対して課金される

データ入力

  • PutRecord APIを使ってデータをKinesisに入力する
  • Kinesisは受け取ったデータをShardに振り分ける
  • KinesisはDataRecordにStream内でユニークなシーケンス番号をつける
    • 時間経過で増加する値なので、Shardの中で1ずつ増えるわけではない
    • put時にSequenceNumberForOrderingパラメータをつければ、厳密に昇順になる
  • Log4J Appender: Log4Jの出力をKinesisに入力

データの取得と処理

  • GetShardIterator API、GetRecords APIを使ってデータを取得する
    • 二段構成で取得
      • Shard名、タイプを指定して、ShardIteratorを取得
      • Iteratorを使ってデータを取得
  • ShardIteratorType: 4種類(定数値はAPI参照)
    • 指定のシーケンス番号から
    • 指定のシーケンス番号以降から
    • Shardの最も古いデータから
    • 最新データから
  • Kinesis Client Library(Java)を使うと、もっと可用性を高める処理ができる
    • https://github.com/awslabs/amazon-kinesis-client
    • チェックポイント(データをどこまで読み込んだかを示すシリアル番号)の管理にDynamoDBを利用
    • 実行するとWorkerスレッドが動く
    • IRecordProcessorを実装して利用
    • 可用性の実現
      • 複数のKinesisアプリが動いている場合、あるインスタンスが動かないと、別のインスタンスが処理を引き継ぐ
      • DynamoDB内のチェックポイントの更新を監視して実現しているらしい
      • 新しいShardが追加されると検知できる
    • 同じターゲットストリームに対して、複数のアプリを作ることができる
      • DynamoDBに複数のテーブルが作られるので、それぞれどこまで読んだかを管理できる
  • Kinesis Connector Library(Java)を使うと、他のサービスとの連携が容易になる
  • Kinesis Storm Spoutを使うと、Apache Storm(http://storm.incubator.apache.org/)と組み合わせやすくなる
  • EMR Connectorを使うと、Hadoopのツールを使ってデータを取得、処理できる

Kinesisの運用

  • CloudWatchでメトリクス監視
  • Shardの分割とマージ
    • SpritShard API、MergeShard API
    • 業務量が多い時間帯に分割、少なくなったらマージ
    • Shardを立てれば立てるほど受けられるデータは増えるが、課金も増えるので、入力量に応じたShardを立てるのがポイント
      • 分割: 担当するハッシュキーの開始値を指定して分割する
      • マージ: マージ対象のSharedを指定するだけ

サンプルアーキテクチャの紹介

  • ターゲットのストリームに対して複数のクライアントがあるケース
  • Streamをパイプラインのようにつなぐケース
    • 複数のData Sourceをそれぞれクレンジングするなど
  • その他のAWSサービス(S3、RedShift、RDS、DynamoDB)と連携するケース

事例

Kinesisの使いどころ

  • SQSとの使い分けは?
    • SQS: 1つの処理を分担
    • Kinesis: 用途に応じてアプリを配置
  • Kinesisの特徴
    • Pub-Subメッセージモデル
    • ユニークなシーケンス番号が振られる
  • Kinesisを何に使う?
    • リアルタイム情報の可能化に
    • 同じデータを使った長期分析に(機械学習との組み合わせ)
    • 後続に続くデータ処理の事前処理に(ETL処理)
    • 大量データの一次バッファに(ただしSQSでも同じことができる)

質疑応答

Shardのキャパシティ超えるとPUTリクエストどうなりますか?
エラーがPUTする側に返るので、Shardを分割してください。Shardの監視は今後対応予定ですが、今のところは事前に流量の多い場合のデータ量を想定して、Shardを立ててください。
DynamoDBのキャパシティ設定は?
どれくらいのデータをバッファ、エミットするか次第です。状況をみてプロビジョンスループットを変えてください。
PUTの手前にバッファやリトライ機能を置くべきですか?
将来Shardの状況を取得する機能が追加されれば不要です。今のところは、手前の処理の可用性など、システム全体を考慮して判断ください。
PUTから処理までにデータが抜ける可能性はありますか。
データにシーケンス番号が付与された状態であれば3つのAZで保存されるので、大丈夫です。ただし、PUT漏れが発生した場合は、処理対象のデータが抜けることがありえます。
Kinesisアプリのデプロイ先はどこですか?
理想的にはEC2ですが、EC2以外からでも使えます。
Shardのキャパシティチェックの頻度は?
チェック機構自体が現在のところ存在しません。

最後の質問について、じゃあShardのキャパシティオーバのエラーはどうやって出してるんだろ?と思いましたが、なんか別の機構なんですかね。(聞き逃してしまったかも)

そんな感じのお話でした。勉強になりました。ありがとうございました。