上級 40分 Lesson 14

Detective・EventBridge・インシデントレスポンス

Detective調査フロー、EventBridgeセキュリティ自動化、インシデントレスポンスパターンを徹底解説

AWS Detective EventBridge SCS-C03 インシデントレスポンス Security

Amazon Detective — セキュリティ調査の最後の砦

Detective概要:何ができるのか

Amazon Detectiveは、GuardDutyなどのセキュリティサービスが検出した疑わしい動きの原因を調べるための専門ツールです。

GuardDuty Finding(検出)

Detective(原因調査)

セキュリティチームが対応判断

Detectiveの本質

  • GuardDutyは「異常を検知する犬」
  • Detectiveは「異常の根拠を調べる刑事」

データソースと自動ビヘイビアグラフ構築

Detectiveが活用するデータソースは以下の通り:

データソース用途保持期間リアルタイム性
CloudTrailAPI呼び出し履歴(誰が・いつ・何をした)12ヶ月15-30分遅延
VPC Flow Logsネットワーク通信(EC2間、インターネット)12ヶ月数分遅延
GuardDuty Findingsセキュリティイベント検出90日ほぼリアルタイム
EKS Audit LogsKubernetesクラスタ操作12ヶ月数分遅延
S3アクセスログバケット操作12ヶ月数時間遅延

ビヘイビアグラフ(Behavior Graph):これらのデータを統合して、Entity(IAMロール、EC2、ユーザー)の動作パターンを自動的に構築します。

ビヘイビアグラフの仕組み

CloudTrail + VPC Flow Logs + GuardDuty Findings → Detective処理 → ビヘイビアグラフ化

ビヘイビアグラフの構成:

  • ノード: Entity(IAMロール、EC2インスタンス、ユーザー、IPアドレス等)
  • エッジ: Entity間の関連性と通信パターン
  • 属性: 各EntityのAPI呼び出し、ネットワーク通信、時間帯

調査インターフェース(Timeline、Entity、Finding)

12ヶ月のデータ保持

  • 過去1年のAPI呼び出し履歴を保持
  • 異常が発生した場合、「通常と異なる」を判定するベースラインが存在
  • 例:「このIAMロールは通常1時間に100回のAPI呼び出しをするが、今日は10,000回」→異常判定

自動構築のタイムラインと遅延

イベント発生Detective取り込みビヘイビアグラフ反映
0分CloudTrail(15-30分後)15-30分
0分VPC Flow Logs(3-5分後)3-5分
0分GuardDuty(リアルタイム)1-2分

重要:最大48時間のデータ取り込み遅延が発生する可能性があり、リアルタイム検知用途には向きません。

Detective調査フロー(実践的な手順)

ステップ1:Finding → Entity を特定

GuardDutyが検出した疑わしいFindingから調査を開始します。

GuardDuty Finding 「EC2インスタンス i-1234567890abcdef0 が異常な API 呼び出しをしています」

↓ Detective でこのEC2インスタンスを Entity として開く

↓ 「このEC2インスタンスに紐付く IAM ロール」を特定

ステップ2:Timeline で時系列に追う

DetectiveのTimeline機能は、ある Entity(例:IAMロール)が「いつ・何をしたか」を時系列で表示します。

  • 2024-04-27 14:30:15 - DescribeInstances
  • 2024-04-27 14:30:18 - GetSecretValue(Secret名:db-password)
  • 2024-04-27 14:30:22 - CreateAccessKey(新しいIAMキー生成)
  • 2024-04-27 14:30:25 - AssumeRole(別のロールに切り替え)
  • 2024-04-27 14:30:30 - PutObject to s3://sensitive-bucket/

Timeline から読み取るポイント

  • 「normal」vs「unusual」ラベルが自動付与される
  • 関連するVPC Flow Logsの通信も表示される
  • CloudTrail log のリンクで詳細を確認可能

ステップ3:関連 Entity をたどる

不正 IAM Role

  • Created Access Key #1
  • Assumed by Role B
  • Communicated with EC2 instance X
  • S3 bucket への PutObject

この関連性グラフがビヘイビアグラフの正体です。「点と線」で攻撃経路を可視化。

ステップ4:コンテキスト情報の収集

Detectiveは以下のコンテキストを自動収集:

  • IP Reputation:通信先IPが既知の悪意あるものか
  • First Seen:このEntity の動作が初めてか、通常か
  • AWS Account Context:複数アカウント環境での関連性
  • Resource Details:該当EC2のセキュリティグループ、タグ、VPC設定等

組織での委任管理者設定

複数AWSアカウントを組織管理している場合:

Organization 構成
├── Management Account(委任管理者の設定はここで)
│   ├── AWS Organizations 設定
│   └── Detective → 委任管理者を指定
│       ↓
├── Security Account(委任管理者)
│   ├── Detective 有効化
│   ├── 全 Member Account のデータを一元管理
│   └── 全組織メンバーの Findings を閲覧・調査
│       ↓
├── Workload Account A
│   ├── Detective 有効化済み(Security Account に統治)
│   └── データは自動的に委任管理者へ送信

└── Workload Account B
    └── 同上

委任管理者のメリット

  • セキュリティチームが全アカウントを一箇所から監視
  • 権限管理が効率的(各アカウントのIAMを管理不要)
  • クロスアカウント攻撃の可視化

試験で狙われるポイント

Q: Detective が検出(detection)できない理由は?

  • A: Detectiveは検出ツールではなく調査ツール。GuardDutyが検出したFindingを分析するもの。
  • GuardDutyがFindingを生成しなければ、Detectiveには何も出現しない。

Q: Detective のデータ遅延は何が原因?

  • A: CloudTrail、VPC Flow Logs、GuardDuty の各ソースが非同期に Detective へデータ送信するため。
  • 最大48時間の遅延あり。リアルタイムセキュリティ対応には SecurityHub + EventBridge を組み合わせるべき。

Q: ビヘイビアグラフはいつリセット?

  • A: リセットされない。12ヶ月の完全履歴を保持し、「通常との差分」を判定し続ける。
  • 新しいAWSアカウントをDetectiveに追加すると、初期2週間はグラフが不完全。

できないこと・制約

制約理由と対処
リアルタイム検出不可データ取り込みに15-48時間の遅延。即座に対応が必要な場合は GuardDuty + EventBridge + Lambda を使う
アラート機能なしDetective はダッシュボード型調査ツール。通知が必要な場合は SecurityHub → EventBridge で実装
GuardDuty有効化が前提Detective 単体では何もできない。GuardDuty のFinding がなければ調査対象がない
データ保持期間固定12ヶ月で完全削除(カスタマイズ不可)。コンプライアンス要件で長期保持が必要な場合は S3 アーカイブが必須
ルールカスタマイズ不可「異常」「通常」の判定ロジックは AWS が決める。業務ロジックに合わせた調整は不可
API が限定的ListFindings, GetFindings しか公開されていない。Timeline や Graph データへのAPI アクセスなし(Web UI のみ)

EventBridge — セキュリティイベント駆動自動化

EventBridge の役割(セキュリティコンテキスト)

EventBridge は「セキュリティイベントが起きたら即座に反応する」ための仕組みです。

Guard Duty Finding 検出(ほぼリアルタイム)

↓ EventBridge ルール マッチング(<1秒)

ターゲット実行:

  • Lambda(自動修復)
  • SNS(通知)
  • SystemsManager(Playbook 実行)
  • SQS(バッチ処理)

Detective vs EventBridge

  • Detective:「何が起きたのか調べる」(フォレンジック)
  • EventBridge:「起きたら即座に対応する」(自動化)

インシデントレスポンスパイプライン図

Loading diagram...

セキュリティイベント駆動アーキテクチャパターン

パターン1:GuardDuty Finding → 自動修復

{
  "Name": "GuardDutyEC2Remediation",
  "EventBusName": "default",
  "EventPattern": {
    "source": ["aws.guardduty"],
    "detail-type": ["GuardDuty Finding"],
    "detail": {
      "severity": [7, 7.1, 7.2, 7.3, 7.4, 7.5, 7.6, 7.7, 7.8, 7.9, 8, 8.1, 8.2, 8.3, 8.4, 8.5, 8.6, 8.7, 8.8, 8.9],
      "type": ["EC2/UnauthorizedAccess*", "EC2/MaliciousIPCaller*"]
    }
  },
  "State": "ENABLED",
  "Targets": [
    {
      "Arn": "arn:aws:lambda:ap-northeast-1:123456789012:function:GuardDutyAutoRemediate",
      "RoleArn": "arn:aws:iam::123456789012:role/EventBridgeExecutionRole",
      "InputTransformer": {
        "InputPathsMap": {
          "detail": "$.detail",
          "instanceId": "$.detail.resource.instanceDetails.instanceId"
        },
        "InputTemplate": "{\"instanceId\": <instanceId>, \"findingDetail\": <detail>}"
      }
    }
  ]
}

ここで重要な設定

  1. Severity フィルタリング

    • 7 以上(High 以上)だけを対象
    • Low なFinding では Lambda は実行されない
  2. Finding Type フィルタリング

    • EC2/UnauthorizedAccess*:不正アクセス関連
    • EC2/MaliciousIPCaller*:悪意あるIP からの通信
    • ワイルドカード(*)で同じ接頭辞のすべてを対象化
  3. InputTransformer

    • GuardDuty Finding の JSON を構造化
    • Lambda へ渡す JSON の形式をカスタマイズ

パターン2:Security Hub Finding → マルチチャネル通知

{
  "Name": "SecurityHubFindingNotification",
  "EventPattern": {
    "source": ["aws.securityhub"],
    "detail-type": ["Security Hub Findings - Imported"],
    "detail": {
      "severity": {
        "label": ["CRITICAL", "HIGH"]
      },
      "compliance": {
        "status": ["FAILED"]
      }
    }
  },
  "Targets": [
    {
      "Arn": "arn:aws:sns:ap-northeast-1:123456789012:SecurityAlerts",
      "InputTransformer": {
        "InputPathsMap": {
          "title": "$.detail.title",
          "severity": "$.detail.severity.label",
          "resource": "$.detail.resources[0].id",
          "awsAccountId": "$.detail.awsAccountId"
        },
        "InputTemplate": "\"Alert: <title> (Severity: <severity>) | Resource: <resource> | Account: <awsAccountId>\""
      }
    },
    {
      "Arn": "arn:aws:lambda:ap-northeast-1:123456789012:function:SlackNotifier",
      "RoleArn": "arn:aws:iam::123456789012:role/EventBridgeExecutionRole"
    }
  ]
}

複数ターゲットの効果

  • SNS:メール・SMS通知
  • Lambda:Slack、Datadog、Splunk への連携
  • 同時実行で、複数の通知チャネルをカバー

パターン3:Config Rule 非準拠 → 自動修復 DLQ

{
  "Name": "ConfigNonCompliantRemediation",
  "EventPattern": {
    "source": ["aws.config"],
    "detail-type": ["Config Rules – Compliance Change"],
    "detail": {
      "messageType": ["ComplianceChangeNotification"],
      "configRuleName": ["s3-bucket-server-side-encryption-enabled"],
      "complianceType": ["NON_COMPLIANT"]
    }
  },
  "Targets": [
    {
      "Arn": "arn:aws:ssm:ap-northeast-1:123456789012:automation-definition/S3EncryptionRemediation",
      "RoleArn": "arn:aws:iam::123456789012:role/EventBridgeExecutionRole",
      "InputTransformer": {
        "InputPathsMap": {
          "bucketName": "$.detail.resourceName"
        },
        "InputTemplate": "{\"BucketName\": [\"<bucketName>\"]}"
      },
      "DeadLetterConfig": {
        "Arn": "arn:aws:sqs:ap-northeast-1:123456789012:remediation-dlq"
      }
    }
  ]
}

DLQ(Dead Letter Queue)

  • 修復に失敗した場合、メッセージをここに溜める
  • セキュリティチームが後で手動確認・対応できる

パターン4:IAM Access Analyzer Finding → アクセス権を自動削除

{
  "Name": "AccessAnalyzerAutoRemediate",
  "EventPattern": {
    "source": ["aws.access-analyzer"],
    "detail-type": ["Access Analyzer Finding"],
    "detail": {
      "findingType": ["UnexpectedAccess", "ExternalAccess"],
      "status": ["ACTIVE"]
    }
  },
  "Targets": [
    {
      "Arn": "arn:aws:lambda:ap-northeast-1:123456789012:function:RemoveExternalAccess",
      "RoleArn": "arn:aws:iam::123456789012:role/EventBridgeExecutionRole"
    }
  ]
}

パターン5:CloudTrail API Call → リアルタイムセキュリティ検知

{
  "Name": "SuspiciousAPICallDetection",
  "EventPattern": {
    "source": ["aws.ec2"],
    "detail-type": ["AWS API Call via CloudTrail"],
    "detail": {
      "eventName": ["AuthorizeSecurityGroupIngress", "ModifyDBInstance"],
      "errorCode": [{ "exists": false }],
      "requestParameters": {
        "cidrIp": ["0.0.0.0/0"]
      }
    }
  },
  "Targets": [
    {
      "Arn": "arn:aws:sns:ap-northeast-1:123456789012:SecurityOpsAlert",
      "InputTransformer": {
        "InputPathsMap": {
          "eventName": "$.detail.eventName",
          "userIdentity": "$.detail.userIdentity.principalId",
          "sourceIP": "$.detail.sourceIPAddress",
          "timestamp": "$.detail.eventTime"
        },
        "InputTemplate": "\"ALERT: Suspicious <eventName> from <userIdentity> (IP: <sourceIP>) at <timestamp>\""
      }
    }
  ]
}

重要な検知ロジック

  • イベント:AuthorizeSecurityGroupIngress(セキュリティグループのルール追加)
  • 条件:CIDR = 0.0.0.0/0(全世界からのアクセス許可)
  • エラーなし:=== 実際に実行された

ルール設計パターン

イベントパターンマッチング

{
  "EventPattern": {
    "source": ["aws.guardduty"],
    "detail-type": ["GuardDuty Finding"],
    "detail": {
      "severity": [7, 8, 8.1],
      "type": ["EC2/UnauthorizedAccess.EC2.RDP.BruteForce"],
      "resource": {
        "instanceDetails": {
          "instanceId": ["i-12345*"]
        }
      }
    }
  }
}

マッチングのルール

パターンマッチング使い分け
"severity": [7, 8, 9]値のいずれか(OR)複数の Severity レベルを対象
"severity": [{"numeric": [">=", 7]}]数値比較7 以上のすべての Severity
"type": ["EC2/UnauthorizedAccess*"]ワイルドカード接頭辞で複数の Finding Type
"errorCode": [{"exists": false}]存在判定API が正常実行(errorCode がない)
"requestParameters": {"cidrIp": ["0.0.0.0/0"]}ネストされた値JSON内の深い階層を指定

Input Transformer でメッセージカスタマイズ

{
  "InputTransformer": {
    "InputPathsMap": {
      "severity": "$.detail.severity",
      "type": "$.detail.type",
      "instanceId": "$.detail.resource.instanceDetails.instanceId",
      "accountId": "$.detail.accountId",
      "region": "$.detail.region",
      "createdAt": "$.detail.createdAt"
    },
    "InputTemplate": "\"Alert\\nSeverity: <severity>\\nType: <type>\\nInstance: <instanceId>\\nAccount: <accountId>\\nRegion: <region>\\nTime: <createdAt>\""
  }
}

実際の Lambda へ渡されるペイロード

{
  "severity": 8,
  "type": "EC2/UnauthorizedAccess.EC2.RDP.BruteForce",
  "instanceId": "i-1234567890abcdef0",
  "accountId": "123456789012",
  "region": "ap-northeast-1",
  "createdAt": "2024-04-27T14:30:00Z"
}

クロスアカウント・クロスリージョンイベント

クロスアカウントイベントバス

# Security Account(中央)
EventBus: SecurityCentral
Policy:
  Effect: Allow
  Principal:
    AWS:
      - arn:aws:iam::111111111111:root
      - arn:aws:iam::222222222222:root
  Action: events:PutEvents
  Resource: arn:aws:events:*:123456789012:event-bus/SecurityCentral

# Workload Account A
EventBridge Rule:
  EventBusName: default
  EventPattern:
    source: ["aws.guardduty"]
  Targets:
    - Arn: arn:events:ap-northeast-1:123456789012:event-bus/SecurityCentral
      RoleArn: arn:aws:iam::111111111111:role/CrossAccountEventBridgeRole

フロー

Workload Account A
  GuardDuty Finding発生

EventBridge Rule(default bus)

PutEvents to Security Account(SecurityCentral bus)

Security Account
  一元的にルール適用・ログ保存

クロスリージョンイベント

{
  "Name": "MultiRegionSecurityAlert",
  "EventBusName": "central-event-bus",
  "EventPattern": {
    "source": ["aws.guardduty"],
    "region": ["ap-northeast-1", "us-east-1", "eu-west-1"]
  },
  "Targets": [
    {
      "Arn": "arn:aws:lambda:ap-northeast-1:123456789012:function:GlobalSecurityHandler"
    }
  ]
}

利用例

  • 複数リージョンの GuardDuty Finding を 1つのLambda に集約
  • セキュリティチームが 1つのダッシュボードで監視

試験で狙われるポイント

Q: EventBridge ルールが最大いくつ作成可能か?

  • A: AWS Account あたり 300個(ソフト制限)。追加リクエストで増設可能。

Q: EventBridge の ターゲット数制限は?

  • A: ルール1個あたり最大 100個のターゲット。複数SNSトピック、複数Lambda、SQS、など。

Q: イベント配信の遅延は?

  • A: 通常 <1秒。ただしターゲット側の処理時間は別。例:Lambda cold start(2-3秒)。

Q: EventPattern で「AND条件」を書く方法は?

  • A: JSON のネストで自動的に AND になる。例:source AND detail-type AND detail.severity

Q: EventBridge が「イベントを見落とす」可能性は?

  • A: ないと言える。AWS 保証:最低1回の配信(at-least-once semantics)。

できないこと・制約

制約理由と対処
ルール数 制限300個/Account(ソフト上限)。超える場合は複数 EventBus を分割・スケーリング
ターゲット数 制限100個/ルール。多数の通知先が必要な場合は SNS → 複数Subscriber で中継
配信遅延通常 <1秒。ただし DLQ 送信失敗時は自動 retry。最大24時間 retry
イベントサイズ制限最大 256KB/イベント。大規模な Finding は S3 参照リンク化で対応
複雑な条件判定EventPattern は JSON マッチのみ。複雑な ビジネスロジック は Lambda で実装
Cron スケジュールイベント駆動型。定期的なスキャン(例:1時間ごと)は EventBridge Scheduler を別途使用

インシデントレスポンスパターン

自動修復パイプラインの設計

アーキテクチャ全体

セキュリティイベント検出(GuardDuty/SecurityHub/Config)

EventBridge Rule マッチング

Lambda 関数(修復ロジック)

修復実行:
  ├─ EC2隔離(SG切り替え)
  ├─ IAMキー無効化
  ├─ S3 パブリック化解除
  ├─ EBS スナップショット(フォレンジック)
  └─ CloudWatch ログ記録・通知

Systems Manager Automation(複雑な修復フロー)

監査ログ → CloudTrail, Config, Detective で事後確認

パターン1:EC2隔離(SG切り替え)

シナリオ

  • GuardDutyが「EC2 i-123456 が不正アクセスされている」と検出
  • 即座に該当EC2を隔離(外部通信禁止)
  • フォレンジック用に「隔離状態」を保持

Lambda コード

import boto3
import json

ec2_client = boto3.client('ec2')
cloudwatch = boto3.client('cloudwatch')

ISOLATION_SG_ID = 'sg-isolation-no-ingress'  # 事前に作成しておく
ISOLATION_ROLE_TAG = 'SecurityStatus=Isolated'

def lambda_handler(event, context):
    """
    EC2インスタンスを隔離状態に切り替える
    """
    
    # EventBridge からのペイロード
    detail = event.get('detail', {})
    
    # GuardDuty Finding から インスタンスID を抽出
    instance_id = detail.get('resource', {}).get('instanceDetails', {}).get('instanceId')
    region = detail.get('region')
    severity = detail.get('severity')
    finding_type = detail.get('type')
    
    if not instance_id:
        return {
            'statusCode': 400,
            'body': json.dumps('Instance ID not found in Finding')
        }
    
    try:
        # ステップ1:現在のセキュリティグループを取得・記録
        describe_response = ec2_client.describe_instances(InstanceIds=[instance_id])
        instance = describe_response['Reservations'][0]['Instances'][0]
        original_sgs = [sg['GroupId'] for sg in instance['SecurityGroups']]
        
        # タグに記録(復旧時に参照)
        ec2_client.create_tags(
            Resources=[instance_id],
            Tags=[
                {
                    'Key': 'OriginalSecurityGroups',
                    'Value': ','.join(original_sgs)
                },
                {
                    'Key': 'IsolationTime',
                    'Value': str(datetime.datetime.utcnow().isoformat())
                },
                {
                    'Key': 'GuardDutyFindingType',
                    'Value': finding_type
                },
                {
                    'Key': 'SecurityStatus',
                    'Value': 'Isolated'
                }
            ]
        )
        
        # ステップ2:隔離 SG に切り替え
        ec2_client.modify_instance_attribute(
            InstanceId=instance_id,
            Groups=[ISOLATION_SG_ID]
        )
        
        # ステップ3:CloudWatch メトリクス記録
        cloudwatch.put_metric_data(
            Namespace='Security/AutoRemediation',
            MetricData=[
                {
                    'MetricName': 'EC2Isolated',
                    'Value': 1,
                    'Unit': 'Count',
                    'Dimensions': [
                        {'Name': 'InstanceId', 'Value': instance_id},
                        {'Name': 'Region', 'Value': region},
                        {'Name': 'Severity', 'Value': str(severity)}
                    ]
                }
            ]
        )
        
        # ステップ4:通知
        sns_client = boto3.client('sns')
        sns_client.publish(
            TopicArn='arn:aws:sns:ap-northeast-1:123456789012:SecurityAlerts',
            Subject=f'[AUTOMATED] EC2 Isolated: {instance_id}',
            Message=f"""
EC2 Instance が自動隔離されました

Instance ID: {instance_id}
Region: {region}
Severity: {severity}
Finding Type: {finding_type}

元のセキュリティグループ: {', '.join(original_sgs)}
現在のセキュリティグループ: {ISOLATION_SG_ID}

フォレンジック検査後、セキュリティチームが復旧判断を行います。
            """
        )
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Instance isolated successfully',
                'instance_id': instance_id,
                'original_sgs': original_sgs,
                'isolation_sg': ISOLATION_SG_ID
            })
        }
    
    except Exception as e:
        # 失敗時は DLQ へ(EventBridge の DeadLetterConfig で設定)
        print(f'Error isolating instance: {str(e)}')
        raise

隔離用 Security Group の事前設定

# Terraform で定義
resource "aws_security_group" "isolation" {
  name        = "isolation-no-ingress"
  description = "Isolation SG - No inbound traffic allowed"
  vpc_id      = aws_vpc.main.id

  # Ingress なし(全て denied)
  # Egress のみ許可(CloudWatch Logs, SNS通知のため)
  egress {
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name = "isolation-no-ingress"
    Purpose = "Forensics"
  }
}

パターン2:IAM キー無効化(即座対応)

シナリオ

  • Access Analyzer が「IAM キーが外部に公開されている」と検出
  • 即座にキーを無効化・削除
  • 新しいキーで置き換え可能か自動判定

Lambda コード

import boto3
import json
from datetime import datetime

iam_client = boto3.client('iam')
sns_client = boto3.client('sns')

def lambda_handler(event, context):
    """
    公開された IAM Access Key を無効化
    """
    
    detail = event.get('detail', {})
    finding_type = detail.get('findingType')  # UnexpectedAccess, ExternalAccess
    resource = detail.get('resource', {})
    
    # リソース情報から IAM 情報を抽出
    principal = resource.get('principalArn')  # arn:aws:iam::123456789012:user/username
    
    if not principal:
        return {'statusCode': 400, 'body': 'Principal not found'}
    
    # IAM User / Role を解析
    principal_type, principal_name = parse_principal(principal)
    
    try:
        if principal_type == 'user':
            # ステップ1:該当ユーザーのアクセスキーをすべて列挙
            keys_response = iam_client.list_access_keys(UserName=principal_name)
            access_keys = keys_response.get('AccessKeyMetadata', [])
            
            # ステップ2:アクティブなキーを無効化
            deactivated_keys = []
            for key in access_keys:
                if key['Status'] == 'Active':
                    iam_client.update_access_key(
                        UserName=principal_name,
                        AccessKeyId=key['AccessKeyId'],
                        Status='Inactive'
                    )
                    deactivated_keys.append(key['AccessKeyId'])
            
            # ステップ3:古いキー(7日以上前)を削除
            deletion_threshold = (datetime.utcnow() - timedelta(days=7)).timestamp()
            for key in access_keys:
                key_creation_time = key['CreateDate'].timestamp()
                if key_creation_time < deletion_threshold and key['AccessKeyId'] not in deactivated_keys:
                    iam_client.delete_access_key(
                        UserName=principal_name,
                        AccessKeyId=key['AccessKeyId']
                    )
                    deactivated_keys.append(key['AccessKeyId'])
            
            # ステップ4:新しいキー生成(リソースが現在のキーに依存している場合)
            # ※ 実装は慎重に。アプリケーション停止を避ける必要あり
            new_key = iam_client.create_access_key(UserName=principal_name)
            new_access_key_id = new_key['AccessKey']['AccessKeyId']
            new_secret_access_key = new_key['AccessKey']['SecretAccessKey']
            
            # ステップ5:Secrets Manager に新しいキーを保存
            secrets_client = boto3.client('secretsmanager')
            try:
                secrets_client.create_secret(
                    Name=f'iam-key-rotation/{principal_name}/{new_access_key_id}',
                    SecretString=json.dumps({
                        'AccessKeyId': new_access_key_id,
                        'SecretAccessKey': new_secret_access_key,
                        'UserName': principal_name,
                        'RotatedAt': datetime.utcnow().isoformat(),
                        'Reason': f'Automatic rotation due to {finding_type}'
                    })
                )
            except Exception as e:
                print(f'Failed to save new key to Secrets Manager: {e}')
            
            # ステップ6:通知
            sns_client.publish(
                TopicArn='arn:aws:sns:ap-northeast-1:123456789012:SecurityOpsAlert',
                Subject=f'[CRITICAL] IAM Keys Deactivated: {principal_name}',
                Message=f"""
IAM Access Key が自動無効化されました(セキュリティ上の理由)

ユーザー: {principal_name}
Finding Type: {finding_type}
無効化されたキー: {', '.join(deactivated_keys)}

新しいキーが Secrets Manager に保存されました:
- Secret Name: iam-key-rotation/{principal_name}/{new_access_key_id}

アクションが必要な場合:
1. セキュリティチームが新しいキーを検証
2. アプリケーションへデプロイ
3. 古いキーの削除承認

⚠️ このユーザーを使用しているアプリケーションが停止している可能性があります。
                """
            )
            
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'IAM keys deactivated and rotated',
                    'user': principal_name,
                    'deactivated_keys': deactivated_keys,
                    'new_key_created': new_access_key_id
                })
            }
    
    except Exception as e:
        print(f'Error deactivating IAM keys: {e}')
        raise

def parse_principal(principal_arn):
    """
    arn:aws:iam::123456789012:user/username → ('user', 'username')
    """
    parts = principal_arn.split(':')
    if len(parts) >= 6:
        resource_type_and_name = parts[5].split('/')
        if len(resource_type_and_name) == 2:
            return resource_type_and_name
    return None, None

パターン3:S3 パブリック化解除

シナリオ

  • Config Rule が「S3 バケットがパブリック公開設定になっている」と検出
  • 即座に BlockPublicAccess を強制

EventBridge Rule + Systems Manager Automation

{
  "Name": "S3PublicAccessRemediation",
  "EventPattern": {
    "source": ["aws.config"],
    "detail-type": ["Config Rules – Compliance Change"],
    "detail": {
      "configRuleName": [
        "s3-bucket-public-read-prohibited",
        "s3-bucket-public-write-prohibited"
      ],
      "complianceType": ["NON_COMPLIANT"]
    }
  },
  "Targets": [
    {
      "Arn": "arn:aws:ssm:ap-northeast-1:123456789012:automation-definition/S3PublicAccessBlockRemediaton:1",
      "RoleArn": "arn:aws:iam::123456789012:role/EventBridgeSSMAutomationRole",
      "InputTransformer": {
        "InputPathsMap": {
          "bucketName": "$.detail.resourceName"
        },
        "InputTemplate": "{\"BucketName\": [\"<bucketName>\"]}"
      }
    }
  ]
}

Systems Manager Automation Document(YAML)

schemaVersion: '0.3'
description: 'Block public access on S3 bucket'
parameters:
  BucketName:
    type: StringList
    description: 'S3 Bucket names to block public access'
mainSteps:
  - name: GetBucketPolicy
    action: 'aws:executeAwsApi'
    inputs:
      Service: s3
      Api: GetBucketPublicAccessBlock
      Bucket: '{{ BucketName[0] }}'
    outputs:
      - Name: PublicAccessBlockConfiguration
        Selector: '$.PublicAccessBlockConfiguration'
        Type: MapList
    onFailure: Continue
  
  - name: ApplyBlockPublicAccess
    action: 'aws:executeAwsApi'
    inputs:
      Service: s3
      Api: PutBucketPublicAccessBlock
      Bucket: '{{ BucketName[0] }}'
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
  
  - name: LogRemediation
    action: 'aws:executeAwsApi'
    inputs:
      Service: logs
      Api: PutLogEvents
      LogGroupName: '/aws/remediation/s3-public-access'
      LogStreamName: '{{ BucketName[0] }}'
      LogEvents:
        - Message: 'S3 bucket public access blocked'
          Timestamp: '{{ global:DATE_TIME }}'

パターン4:EBS スナップショット取得(フォレンジック)

シナリオ

  • EC2が隔離されたら、すぐにディスクをスナップショット取得
  • 後で別の分析インスタンスにアタッチして調査

Lambda コード

import boto3
import json
from datetime import datetime

ec2_client = boto3.client('ec2')
sns_client = boto3.client('sns')

def lambda_handler(event, context):
    """
    隔離された EC2 の EBS ボリュームをスナップショット取得
    """
    
    detail = event.get('detail', {})
    instance_id = detail.get('resource', {}).get('instanceDetails', {}).get('instanceId')
    severity = detail.get('severity')
    
    if not instance_id:
        return {'statusCode': 400}
    
    try:
        # ステップ1:インスタンスの EBS ボリュームを取得
        instances = ec2_client.describe_instances(InstanceIds=[instance_id])
        instance = instances['Reservations'][0]['Instances'][0]
        
        volumes = []
        for bdm in instance.get('BlockDeviceMappings', []):
            volume_id = bdm.get('Ebs', {}).get('VolumeId')
            if volume_id:
                volumes.append(volume_id)
        
        # ステップ2:各ボリュームのスナップショットを取得
        snapshots = []
        for volume_id in volumes:
            snapshot_response = ec2_client.create_snapshot(
                VolumeId=volume_id,
                Description=f'Forensic snapshot: Instance {instance_id} (Severity: {severity})',
                TagSpecifications=[
                    {
                        'ResourceType': 'snapshot',
                        'Tags': [
                            {'Key': 'SourceInstance', 'Value': instance_id},
                            {'Key': 'ForensicAnalysis', 'Value': 'True'},
                            {'Key': 'SnapshotTime', 'Value': datetime.utcnow().isoformat()},
                            {'Key': 'Severity', 'Value': str(severity)}
                        ]
                    }
                ]
            )
            snapshots.append(snapshot_response['SnapshotId'])
        
        # ステップ3:通知
        sns_client.publish(
            TopicArn='arn:aws:sns:ap-northeast-1:123456789012:ForensicTeam',
            Subject=f'[FORENSICS] EBS Snapshot Created: {instance_id}',
            Message=f"""
EBS スナップショットが取得されました(フォレンジック用)

Instance ID: {instance_id}
Snapshots: {', '.join(snapshots)}
Source Volumes: {', '.join(volumes)}

スナップショットは Forensic Analysis Account へコピーしてください
            """
        )
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'instance_id': instance_id,
                'snapshots': snapshots
            })
        }
    
    except Exception as e:
        print(f'Error creating snapshots: {e}')
        raise

フォレンジック用 VPC 設計

フォレンジック用VPC設計

Forensic VPC のセキュリティ設定

# Terraform
resource "aws_vpc" "forensic" {
  cidr_block           = "10.200.0.0/16"
  enable_dns_hostnames = true

  tags = {
    Name = "forensic-analysis-vpc"
    Purpose = "Incident Response & Forensics"
  }
}

resource "aws_security_group" "forensic_analysis" {
  name        = "forensic-analysis"
  description = "Forensic analysis tools - No external access"
  vpc_id      = aws_vpc.forensic.id

  # SSH: セキュリティチームの固定IP のみ
  ingress {
    from_port   = 22
    to_port     = 22
    protocol    = "tcp"
    cidr_blocks = ["203.0.113.0/24"]  # Security Team VPN
  }

  # S3 へのアップロード(証拠保存)
  egress {
    from_port       = 443
    to_port         = 443
    protocol        = "tcp"
    prefix_list_ids = [aws_ec2_managed_prefix_list.s3_prefix_list.id]
  }

  # 外部 Internet アクセス禁止
  # VPC Peering も禁止
}

# S3 Bucket for Forensic Evidence
resource "aws_s3_bucket" "forensic_evidence" {
  bucket = "forensic-evidence-${data.aws_caller_identity.current.account_id}"

  tags = {
    Purpose = "Evidence Storage"
    Classification = "Confidential"
  }
}

# Versioning & MFA Delete(証拠の完全性保証)
resource "aws_s3_bucket_versioning" "forensic_evidence" {
  bucket = aws_s3_bucket.forensic_evidence.id
  versioning_configuration {
    status     = "Enabled"
    mfa_delete = "Enabled"
  }
}

# Encryption(保存時)
resource "aws_s3_bucket_server_side_encryption_configuration" "forensic_evidence" {
  bucket = aws_s3_bucket.forensic_evidence.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm     = "aws:kms"
      kms_master_key_id = aws_kms_key.forensic.arn
    }
  }
}

# Block all public access
resource "aws_s3_bucket_public_access_block" "forensic_evidence" {
  bucket = aws_s3_bucket.forensic_evidence.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

Systems Manager Automation 連携

複雑なマルチステップの修復を自動化:

# Example: EC2隔離 → スナップショット → 通知 → 手動承認
schemaVersion: '0.3'
description: 'Comprehensive EC2 Incident Response'
assumeRole: '{{ AutomationAssumeRole }}'

parameters:
  InstanceId:
    type: String
    description: 'Target EC2 Instance ID'
  
  AutomationAssumeRole:
    type: String
    description: 'IAM role for automation'

mainSteps:
  # Step 1: Isolate EC2
  - name: IsolateEC2
    action: 'aws:executeScript'
    inputs:
      Runtime: python3.8
      Handler: isolate_instance
      Script: |
        def isolate_instance(events, context):
            import boto3
            ec2 = boto3.client('ec2')
            instance_id = events['InstanceId']
            ec2.modify_instance_attribute(
                InstanceId=instance_id,
                Groups=['sg-isolation']
            )
            return {'status': 'isolated'}
      InputPayload:
        InstanceId: '{{ InstanceId }}'

  # Step 2: Create EBS Snapshot
  - name: CreateSnapshot
    action: 'aws:executeAwsApi'
    inputs:
      Service: ec2
      Api: DescribeInstances
      InstanceIds:
        - '{{ InstanceId }}'
    outputs:
      - Name: VolumeIds
        Selector: $.Reservations[0].Instances[0].BlockDeviceMappings[*].Ebs.VolumeId
        Type: MapList

  # Step 3: Manual Approval before deletion
  - name: ManualApproval
    action: 'aws:approve'
    inputs:
      Approvers:
        - 'arn:aws:iam::123456789012:role/SecurityLeadRole'
      Message: 'Review forensic analysis. Approve to terminate instance.'

  # Step 4: Terminate Instance (if approved)
  - name: TerminateInstance
    action: 'aws:executeAwsApi'
    inputs:
      Service: ec2
      Api: TerminateInstances
      InstanceIds:
        - '{{ InstanceId }}'
    onFailure: Continue

試験で狙われるポイント

Q: 自動修復ルールが「過度に削除」しないようにするには?

  • A: EventPattern で Severity フィルタを設定し、Low Finding では実行しない。また Manual Approval ステップで検証。

Q: IAMキーが無効化された場合、アプリケーションが停止するリスクは?

  • A: EventBridge で新しいキーを Secrets Manager に保存し、アプリケーションが定期的に取得できる仕組みが必要。

Q: フォレンジック用 VPC が本体ネットワークと通信してはいけないのはなぜ?

  • A: 攻撃者が Forensic VPC へも侵入する可能性を排除するため(Air-gapped network)。

Q: Config Rule非準拠の修復に DLQ を使う理由は?

  • A: 修復が失敗した場合、セキュリティチームが SQS メッセージを確認して手動対応。自動修復の盲目化を避ける。

まとめ:Detective + EventBridge + Automation の統合フロー

┌─────────────────────────────────────────────────────────────────┐
│              セキュリティイベント統合ワークフロー               │
└─────────────────────────────────────────────────────────────────┘

【検出フェーズ】
GuardDuty / SecurityHub / Config Rule
    ↓ Finding 発行

【自動対応フェーズ】
EventBridge Rule マッチング

    ├─ Severity >= 7 → Lambda(自動修復)
    │   ├─ EC2隔離(SG切り替え)
    │   ├─ IAMキー無効化
    │   ├─ S3パブリック化解除
    │   └─ EBS snapshot取得

    ├─ その他 → SNS通知(セキュリティチーム)

    └─ Approval Required → Systems Manager(手動確認)

【調査フェーズ】
Detective(原因追跡)
    ├─ ビヘイビアグラフで攻撃経路を可視化
    ├─ Timeline で時系列に追跡
    └─ 関連Entity を追跡

【アーカイブ・監査フェーズ】
CloudTrail(完全記録)

Config(状態変更のトレース)

S3(長期保存)

実装時のチェックリスト

  • GuardDuty を有効化(Detective のデータソース)
  • Detective を有効化・ビヘイビアグラフ構築を確認
  • EventBridge ルールを複数段階で設計(Severity フィルタ)
  • Lambda 関数に IAM 最小権限を付与
  • DLQ(SQS)を構成(修復失敗のキャッチ)
  • Systems Manager Automation Document をテスト
  • Forensic VPC を Air-gapped で構成
  • 定期的にシミュレーション(Game Day)を実施
  • ログを CloudTrail → S3 でアーカイブ

SCS-C03 試験にて

このセクションは、「セキュリティイベント検出後の対応フロー」に関する出題が多いです。特に:

  1. Detective のデータ遅延:リアルタイム対応には向かない事実
  2. EventBridge のイベントパターン設計:複数条件の AND/OR 組み合わせ
  3. 自動修復 vs 手動承認:リスク評価に基づく設計判断
  4. Forensic VPC の隔離設計:Air-gapped の重要性
  5. クロスアカウント統治:Organization → Detective → EventBridge の連携

これらを組み合わせて、「検出 → 自動対応 → 調査 → 監査」の完全フローを設計できれば、本番環境でのセキュリティインシデント対応に対応できます。