backend
A
信頼度ランク
| S | 公式ソース確認済み |
| A | 成功実績多数・失敗例少数 |
| B | 賛否両論 |
| C | 動作未確認・セキュリティリスク高 |
| Z | 個人所感 |
Amazon Kinesis Data Streams — リアルタイムデータパイプライン設計
Kinesis Data Streamsのシャード設計、パーティションキーとシャード割り当て、プロデューサー/コンシューマーAPI、Enhanced Fan-Out、KCL vs Lambda統合、データ保持期間とコスト最適化を解説。
一言結論
KinesisはSQSと異なりメッセージが削除されず複数コンシューマーが同じデータを処理できるため、順序保証・再処理・ファンアウトが必要なリアルタイムストリーミングに適しており、Enhanced Fan-Outを使えばコンシューマーごとに独立した2MB/秒のスループットを確保できる。
Kinesis Data Streams の概要
Kinesis Data Streamsはリアルタイムのデータストリーミングサービスだ。IoTデータ、ログ、クリックストリームなど大量のデータを低レイテンシーで処理できる。
Kinesis Data Streams の特性:
→ シャード単位でスループットを管理
→ デフォルト保持期間: 24時間(最大365日に延長可能)
→ 順序保証: パーティションキーごとに順序を保証
→ 複数コンシューマーが同じデータを読み取り可能
→ SQSとは異なりメッセージは削除されない(保持期間まで保存)
シャードとスループット
シャードあたりのスループット:
書き込み: 1,000レコード/秒 または 1MB/秒(いずれか小さい方)
読み取り: 2MB/秒(通常)または 2MB/秒/コンシューマー(Enhanced Fan-Out)
スループット計算例:
要件: 10,000レコード/秒、平均200バイト/レコード
レコード数ベース: 10,000 / 1,000 = 10シャード
データ量ベース: 10,000 × 200 / 1,000,000 = 2MB/秒 → 2シャード
→ 多い方の10シャードを選択
オンデマンドモード(自動スケーリング):
→ シャード数を手動管理する代わりに自動スケーリング
→ 直近30日間のピークスループットの2倍まで自動拡張
→ コスト: 従量課金(シャードではなくデータ量で課金)
パーティションキーの設計
パーティションキーの役割:
→ 同じパーティションキーのレコードは同じシャードに入る
→ シャード内では順序が保証される
→ パーティションキーの分散がシャードの均等利用に影響する
ホットシャード問題:
❌ 悪い例: パーティションキーを "fixed-key" で固定
→ すべてのレコードが1つのシャードに集中
→ 他のシャードは使われない(ホットシャード)
✅ 良い例: ユーザーID、デバイスIDなど高カーディナリティなキーを使用
→ シャードに均等に分散
→ 高スループットを有効活用
プロデューサー API
import boto3
import json
kinesis = boto3.client('kinesis')
def put_records_batch(stream_name, records):
"""バッチでレコードを送信(最大500レコードまたは5MB)"""
kinesis_records = [
{
'Data': json.dumps(record).encode('utf-8'),
'PartitionKey': str(record.get('userId', 'default'))
}
for record in records
]
response = kinesis.put_records(
Records=kinesis_records,
StreamName=stream_name
)
# 失敗レコードの再試行
failed = response.get('FailedRecordCount', 0)
if failed > 0:
print(f"{failed} records failed, retrying...")
failed_records = [
records[i] for i, r in enumerate(response['Records'])
if 'ErrorCode' in r
]
return failed_records
return []
Enhanced Fan-Out(強化されたファンアウト)
通常の読み取り:
→ 全コンシューマーでシャードあたり2MB/秒を共有
→ 例: 5コンシューマー × 1シャード = 各コンシューマー400KB/秒
Enhanced Fan-Out:
→ 各コンシューマーにシャードあたり2MB/秒を専有割り当て
→ 例: 5コンシューマー × 1シャード = 各コンシューマー2MB/秒
→ プッシュ型(HTTP/2)で低レイテンシー(約70ms)
→ コスト: $0.015/シャード時間 + $0.013/GB(通常の2倍程度)
Lambdaとの統合:
→ Lambda + Enhanced Fan-Outでリアルタイム処理
→ 複数のLambda関数が同じストリームを独立して処理可能
Lambda との統合
# Kinesis をLambdaのトリガーとして設定
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream \
--function-name my-processor \
--starting-position LATEST \
--batch-size 100 \
--bisect-batch-on-function-error true \
--parallelization-factor 10 \
--maximum-retry-attempts 3 \
--destination-config '{
"OnFailure": {
"Destination": "arn:aws:sqs:...:my-dlq"
}
}'
重要なパラメータ:
starting-position: LATEST(新規データから)または TRIM_HORIZON(最古から)
parallelization-factor: シャードごとの並列処理数(1〜10)
bisect-batch-on-function-error: 失敗時にバッチを2分割して特定
SQS vs Kinesis の選択基準
Kinesis Data Streams:
✅ 順序が重要(パーティションキーごと)
✅ 複数コンシューマーが同じデータを処理
✅ 大量データのリアルタイムストリーミング
✅ データ再処理(保持期間内)
✅ IoT/ログ/クリックストリーム
SQS:
✅ キューイング(メッセージを処理後に削除)
✅ ワーカー間でのジョブ分担
✅ DLQでの失敗管理
✅ シンプルなメッセージパッシング
✅ スケーリングが自動(シャード管理不要)
試験頻出ポイント
| シナリオ | 回答 |
|---|---|
| 複数コンシューマーが同じデータを処理 | Kinesis Data Streams |
| シャードあたりの書き込みスループット | 1,000レコード/秒 または 1MB/秒 |
| シャードごとに独立した2MB/秒の読み取り | Enhanced Fan-Out |
| ホットシャードの防止 | 高カーディナリティなパーティションキー |
| 自動シャードスケーリング | オンデマンドモード |
まとめ
Kinesis Data Streamsはシャード単位でスループットを管理するリアルタイムストリーミングサービスだ。パーティションキーの分散でシャードを均等に使い、複数コンシューマーが必要な場合はEnhanced Fan-Outを使う。SQSと異なりメッセージは削除されず再処理できる。