SJ blog
backend
A

信頼度ランク

S 公式ソース確認済み
A 成功実績多数・失敗例少数
B 賛否両論
C 動作未確認・セキュリティリスク高
Z 個人所感

Amazon Kinesis Data Streams — リアルタイムデータパイプライン設計

Kinesis Data Streamsのシャード設計、パーティションキーとシャード割り当て、プロデューサー/コンシューマーAPI、Enhanced Fan-Out、KCL vs Lambda統合、データ保持期間とコスト最適化を解説。

一言結論

KinesisはSQSと異なりメッセージが削除されず複数コンシューマーが同じデータを処理できるため、順序保証・再処理・ファンアウトが必要なリアルタイムストリーミングに適しており、Enhanced Fan-Outを使えばコンシューマーごとに独立した2MB/秒のスループットを確保できる。

Kinesis Data Streams の概要

Kinesis Data Streamsはリアルタイムのデータストリーミングサービスだ。IoTデータ、ログ、クリックストリームなど大量のデータを低レイテンシーで処理できる。

Kinesis Data Streams の特性:
  → シャード単位でスループットを管理
  → デフォルト保持期間: 24時間(最大365日に延長可能)
  → 順序保証: パーティションキーごとに順序を保証
  → 複数コンシューマーが同じデータを読み取り可能
  → SQSとは異なりメッセージは削除されない(保持期間まで保存)

シャードとスループット

シャードあたりのスループット:
  書き込み: 1,000レコード/秒 または 1MB/秒(いずれか小さい方)
  読み取り: 2MB/秒(通常)または 2MB/秒/コンシューマー(Enhanced Fan-Out)

スループット計算例:
  要件: 10,000レコード/秒、平均200バイト/レコード
  
  レコード数ベース: 10,000 / 1,000 = 10シャード
  データ量ベース: 10,000 × 200 / 1,000,000 = 2MB/秒 → 2シャード
  
  → 多い方の10シャードを選択

オンデマンドモード(自動スケーリング):
  → シャード数を手動管理する代わりに自動スケーリング
  → 直近30日間のピークスループットの2倍まで自動拡張
  → コスト: 従量課金(シャードではなくデータ量で課金)

パーティションキーの設計

パーティションキーの役割:
  → 同じパーティションキーのレコードは同じシャードに入る
  → シャード内では順序が保証される
  → パーティションキーの分散がシャードの均等利用に影響する
  
ホットシャード問題:
  ❌ 悪い例: パーティションキーを "fixed-key" で固定
     → すべてのレコードが1つのシャードに集中
     → 他のシャードは使われない(ホットシャード)
  
  ✅ 良い例: ユーザーID、デバイスIDなど高カーディナリティなキーを使用
     → シャードに均等に分散
     → 高スループットを有効活用

プロデューサー API

import boto3
import json

kinesis = boto3.client('kinesis')

def put_records_batch(stream_name, records):
    """バッチでレコードを送信(最大500レコードまたは5MB)"""
    kinesis_records = [
        {
            'Data': json.dumps(record).encode('utf-8'),
            'PartitionKey': str(record.get('userId', 'default'))
        }
        for record in records
    ]
    
    response = kinesis.put_records(
        Records=kinesis_records,
        StreamName=stream_name
    )
    
    # 失敗レコードの再試行
    failed = response.get('FailedRecordCount', 0)
    if failed > 0:
        print(f"{failed} records failed, retrying...")
        failed_records = [
            records[i] for i, r in enumerate(response['Records'])
            if 'ErrorCode' in r
        ]
        return failed_records
    
    return []

Enhanced Fan-Out(強化されたファンアウト)

通常の読み取り:
  → 全コンシューマーでシャードあたり2MB/秒を共有
  → 例: 5コンシューマー × 1シャード = 各コンシューマー400KB/秒
  
Enhanced Fan-Out:
  → 各コンシューマーにシャードあたり2MB/秒を専有割り当て
  → 例: 5コンシューマー × 1シャード = 各コンシューマー2MB/秒
  → プッシュ型(HTTP/2)で低レイテンシー(約70ms)
  → コスト: $0.015/シャード時間 + $0.013/GB(通常の2倍程度)
  
Lambdaとの統合:
  → Lambda + Enhanced Fan-Outでリアルタイム処理
  → 複数のLambda関数が同じストリームを独立して処理可能

Lambda との統合

# Kinesis をLambdaのトリガーとして設定
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream \
  --function-name my-processor \
  --starting-position LATEST \
  --batch-size 100 \
  --bisect-batch-on-function-error true \
  --parallelization-factor 10 \
  --maximum-retry-attempts 3 \
  --destination-config '{
    "OnFailure": {
      "Destination": "arn:aws:sqs:...:my-dlq"
    }
  }'
重要なパラメータ:
  starting-position: LATEST(新規データから)または TRIM_HORIZON(最古から)
  parallelization-factor: シャードごとの並列処理数(1〜10)
  bisect-batch-on-function-error: 失敗時にバッチを2分割して特定

SQS vs Kinesis の選択基準

Kinesis Data Streams:
  ✅ 順序が重要(パーティションキーごと)
  ✅ 複数コンシューマーが同じデータを処理
  ✅ 大量データのリアルタイムストリーミング
  ✅ データ再処理(保持期間内)
  ✅ IoT/ログ/クリックストリーム
  
SQS:
  ✅ キューイング(メッセージを処理後に削除)
  ✅ ワーカー間でのジョブ分担
  ✅ DLQでの失敗管理
  ✅ シンプルなメッセージパッシング
  ✅ スケーリングが自動(シャード管理不要)

試験頻出ポイント

シナリオ回答
複数コンシューマーが同じデータを処理Kinesis Data Streams
シャードあたりの書き込みスループット1,000レコード/秒 または 1MB/秒
シャードごとに独立した2MB/秒の読み取りEnhanced Fan-Out
ホットシャードの防止高カーディナリティなパーティションキー
自動シャードスケーリングオンデマンドモード

まとめ

Kinesis Data Streamsはシャード単位でスループットを管理するリアルタイムストリーミングサービスだ。パーティションキーの分散でシャードを均等に使い、複数コンシューマーが必要な場合はEnhanced Fan-Outを使う。SQSと異なりメッセージは削除されず再処理できる。