database
A
信頼度ランク
| S | 公式ソース確認済み |
| A | 成功実績多数・失敗例少数 |
| B | 賛否両論 |
| C | 動作未確認・セキュリティリスク高 |
| Z | 個人所感 |
DynamoDB Streams — 変更データキャプチャ・Lambda連携・レプリケーション設計
DynamoDB Streamsのストリームビュータイプ(KEYS_ONLY/NEW_IMAGE/OLD_IMAGE/NEW_AND_OLD_IMAGES)、Lambda関数トリガー、ストリームの保持期間、Kinesis Data Streamsとの違いを解説。
一言結論
DynamoDB StreamsはCDCパターンの実装基盤として検索エンジン同期・集計テーブル更新・クロスリージョンレプリケーションに使えるが、保持期間は24時間固定であるため長期保持が必要な場合はKinesis Data Streamsへの転送を組み合わせる必要がある。
DynamoDB Streamsとは
DynamoDB Streamsはテーブルへの変更(書き込み、更新、削除)を時系列でキャプチャする機能だ。CDC(Change Data Capture)パターンをDynamoDBで実装する基盤となる。
保持期間: 24時間
読み取り保証: at-least-once(重複あり得る)
順序保証: 同一パーティションキー内では順序が保証される
4種類のストリームビュー
KEYS_ONLY:
変更があったアイテムのキー(PK・SK)のみ記録
ストレージコスト最小
NEW_IMAGE:
変更後のアイテムの全属性を記録
「最新状態への同期」に向く
OLD_IMAGE:
変更前のアイテムの全属性を記録
「変更前の状態を知りたい」監査ログ等に向く
NEW_AND_OLD_IMAGES:
変更前・変更後の両方を記録
「何が変わったか」を詳細に記録
ストレージコスト最大
Lambda関数とのトリガー設定
# DynamoDB StreamsのLambdaトリガー設定
aws lambda create-event-source-mapping \
--function-name stream-processor \
--event-source-arn arn:aws:dynamodb:ap-northeast-1:123456789012:table/Orders/stream/2026-04-08T00:00:00.000 \
--starting-position LATEST \
--batch-size 100 \
--maximum-retry-attempts 3 \
--bisect-batch-on-function-error \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:ap-northeast-1:123456789012:stream-dlq"}}'
# Lambda関数でDynamoDB Streamsを処理
def handler(event, context):
for record in event['Records']:
event_name = record['eventName'] # INSERT, MODIFY, REMOVE
if event_name == 'INSERT':
new_item = record['dynamodb']['NewImage']
handle_new_order(deserialize(new_item))
elif event_name == 'MODIFY':
old_item = record['dynamodb']['OldImage']
new_item = record['dynamodb']['NewImage']
handle_order_update(deserialize(old_item), deserialize(new_item))
elif event_name == 'REMOVE':
old_item = record['dynamodb']['OldImage']
handle_order_deletion(deserialize(old_item))
def deserialize(dynamo_item):
"""DynamoDB形式のアイテムを通常のdictに変換"""
from boto3.dynamodb.types import TypeDeserializer
d = TypeDeserializer()
return {k: d.deserialize(v) for k, v in dynamo_item.items()}
ユースケースパターン
パターン1: エラスティックサーチへのデータ同期
def handler(event, context):
es_client = Elasticsearch(['https://my-es-cluster.es.amazonaws.com'])
for record in event['Records']:
if record['eventName'] in ['INSERT', 'MODIFY']:
new_item = deserialize(record['dynamodb']['NewImage'])
es_client.index(
index='products',
id=new_item['productId'],
body=new_item
)
elif record['eventName'] == 'REMOVE':
item_id = record['dynamodb']['Keys']['productId']['S']
es_client.delete(index='products', id=item_id)
パターン2: 集計テーブルのリアルタイム更新
def handler(event, context):
dynamodb = boto3.resource('dynamodb')
stats_table = dynamodb.Table('DailyStats')
for record in event['Records']:
if record['eventName'] == 'INSERT':
new_order = deserialize(record['dynamodb']['NewImage'])
date = new_order['orderDate'][:10] # YYYY-MM-DD
# 原子的に集計を更新
stats_table.update_item(
Key={'date': date},
UpdateExpression='ADD orderCount :one, totalRevenue :amount',
ExpressionAttributeValues={
':one': 1,
':amount': Decimal(str(new_order['total']))
}
)
パターン3: クロスリージョンレプリケーション
DynamoDB Streamsを使って別リージョンのテーブルにデータをレプリケートする(DynamoDB Global Tablesと比べて細かな制御が可能)。
def handler(event, context):
# バージニアリージョンのDynamoDBに書き込み
dynamo_us = boto3.resource('dynamodb', region_name='us-east-1')
replica_table = dynamo_us.Table('Orders-Replica')
with replica_table.batch_writer() as batch:
for record in event['Records']:
if record['eventName'] in ['INSERT', 'MODIFY']:
item = deserialize(record['dynamodb']['NewImage'])
batch.put_item(Item=item)
elif record['eventName'] == 'REMOVE':
keys = {k: deserialize({k: v}) for k, v in
record['dynamodb']['Keys'].items()}
batch.delete_item(Key=keys)
Kinesis Data Streamsへの転送
DynamoDBの変更をKinesis Data Streamsに転送することもできる。
# KDS転送の有効化(1シャードから最大1000シャード)
aws dynamodb enable-kinesis-streaming-destination \
--table-name Orders \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/dynamo-changes
DynamoDB Streams vs KDS:
DynamoDB Streams:
保持: 24時間
コンシューマー: 最大2つ(Lambdaトリガー推奨)
シャード管理: 自動
Kinesis Data Streams:
保持: 24時間〜365日(設定可能)
コンシューマー: 多数(KCL, Firehose, Lambda等)
シャード管理: 手動設定
コスト: 追加料金
シャードの仕組み
DynamoDB Streamsはパーティションと同様にシャードで管理される
同一パーティションキーのイベントは同じシャードに入る
→ Lambda関数は同一PK内の順序でイベントを受け取れる
→ 異なるPK間の順序は保証されない
試験頻出ポイント
| シナリオ | 回答 |
|---|---|
| ストリームの保持期間 | 24時間 |
| 変更前後の両方を記録したい | NEW_AND_OLD_IMAGES |
| ElasticSearchへのリアルタイム同期 | Streamsに Lambda trigger |
| ストリームのデータをより長期保持 | Kinesis Data Streamsへ転送 |
| 同一PK内の順序は保証されるか | 保証される |
まとめ
DynamoDB StreamsはCDCパターンの実装基盤として、ElasticSearch同期、集計テーブル更新、クロスリージョンレプリケーションなど多様なユースケースに活用できる。24時間の保持期間が短い場合はKDSへの転送で解決できる。