AWSTemplateFormatVersion: '2010-09-09'
Description: >
  Distributed Low-Latency CQRS Architecture on AWS
  DynamoDB + DAX + Kinesis CDC による CQRS パターン。書き込み/読み込み分離で低レイテンシーを実現。

# ============================================================
# パラメータ定義
# ============================================================
Parameters:
  EnvironmentName:
    Type: String
    Default: dev
    AllowedValues: [dev, stg, prod]
    Description: デプロイ環境 (dev / stg / prod)

  VpcCidr:
    Type: String
    Default: '10.0.0.0/16'  # TODO: 実運用時に変更してください
    Description: VPC CIDR ブロック

# ============================================================
# リソース定義
# ============================================================
Resources:
  # ----- ネットワーク (DAX / ElastiCache 用) -----
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: !Ref VpcCidr
      EnableDnsSupport: true
      EnableDnsHostnames: true
      Tags:
        - Key: Name
          Value: !Sub '${EnvironmentName}-cqrs-vpc'

  PrivateSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: '10.0.10.0/24'
      AvailabilityZone: !Select [0, !GetAZs '']
      Tags:
        - Key: Name
          Value: !Sub '${EnvironmentName}-private-1'

  PrivateSubnet2:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: '10.0.11.0/24'
      AvailabilityZone: !Select [1, !GetAZs '']
      Tags:
        - Key: Name
          Value: !Sub '${EnvironmentName}-private-2'

  # ----- DynamoDB プライマリストア (Command 側) -----
  PrimaryTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub '${EnvironmentName}-cqrs-primary'
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: PK
          AttributeType: S
        - AttributeName: SK
          AttributeType: S
        - AttributeName: GSI1PK
          AttributeType: S
        - AttributeName: GSI1SK
          AttributeType: S
      KeySchema:
        - AttributeName: PK
          KeyType: HASH
        - AttributeName: SK
          KeyType: RANGE
      GlobalSecondaryIndexes:
        - IndexName: GSI1
          KeySchema:
            - AttributeName: GSI1PK
              KeyType: HASH
            - AttributeName: GSI1SK
              KeyType: RANGE
          Projection:
            ProjectionType: ALL
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: true
      SSESpecification:
        SSEEnabled: true
      Tags:
        - Key: Pattern
          Value: CQRS-Command

  # ----- DAX キャッシュ (Query 側ホットリード) -----
  DAXSubnetGroup:
    Type: AWS::DAX::SubnetGroup
    Properties:
      SubnetGroupName: !Sub '${EnvironmentName}-dax-subnets'
      SubnetIds:
        - !Ref PrivateSubnet1
        - !Ref PrivateSubnet2

  DAXSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: DAX Cluster Security Group
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 8111
          ToPort: 8111
          CidrIp: !Ref VpcCidr

  DAXRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: dax.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: DAXDynamoDBAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - dynamodb:GetItem
                  - dynamodb:BatchGetItem
                  - dynamodb:Query
                  - dynamodb:Scan
                Resource:
                  - !GetAtt PrimaryTable.Arn
                  - !Sub '${PrimaryTable.Arn}/index/*'

  DAXCluster:
    Type: AWS::DAX::Cluster
    Properties:
      ClusterName: !Sub '${EnvironmentName}-cqrs-dax'
      NodeType: dax.t3.small  # TODO: 実運用時にサイジング変更してください
      ReplicationFactor: 2
      IAMRoleARN: !GetAtt DAXRole.Arn
      SubnetGroupName: !Ref DAXSubnetGroup
      SecurityGroupIds:
        - !Ref DAXSecurityGroup
      SSESpecification:
        SSEEnabled: true

  # ----- Kinesis CDC イベントストリーム -----
  CDCStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: !Sub '${EnvironmentName}-cqrs-cdc-stream'
      ShardCount: 2  # TODO: 実運用時にスループットに合わせて調整してください
      StreamEncryption:
        EncryptionType: KMS
        KeyId: alias/aws/kinesis
      RetentionPeriodHours: 24

  # ----- API Gateway -----
  RestApi:
    Type: AWS::ApiGateway::RestApi
    Properties:
      Name: !Sub '${EnvironmentName}-cqrs-api'
      Description: CQRS API (Command / Query エンドポイント)
      EndpointConfiguration:
        Types: [REGIONAL]

  # ----- Lambda: Command Handler (書き込み) -----
  CommandHandlerRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: DynamoDBWrite
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - dynamodb:PutItem
                  - dynamodb:UpdateItem
                  - dynamodb:DeleteItem
                Resource: !GetAtt PrimaryTable.Arn

  CommandHandler:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub '${EnvironmentName}-cqrs-command-handler'
      Runtime: python3.12
      Handler: index.handler
      Role: !GetAtt CommandHandlerRole.Arn
      Timeout: 30
      MemorySize: 256
      TracingConfig:
        Mode: Active
      Environment:
        Variables:
          TABLE_NAME: !Ref PrimaryTable
      Code:
        ZipFile: |
          # TODO: 実運用時にビジネスロジックを実装してください
          import boto3, json, os
          dynamodb = boto3.resource('dynamodb')
          table = dynamodb.Table(os.environ['TABLE_NAME'])
          def handler(event, context):
              body = json.loads(event['body'])
              table.put_item(Item=body)
              return {'statusCode': 200, 'body': json.dumps({'message': 'OK'})}

  # ----- Lambda: Query Handler (読み込み) -----
  QueryHandlerRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: DAXRead
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - dynamodb:GetItem
                  - dynamodb:Query
                  - dynamodb:Scan
                  - dax:GetItem
                  - dax:Query
                  - dax:Scan
                Resource:
                  - !GetAtt PrimaryTable.Arn
                  - !Sub '${PrimaryTable.Arn}/index/*'

  QueryHandler:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub '${EnvironmentName}-cqrs-query-handler'
      Runtime: python3.12
      Handler: index.handler
      Role: !GetAtt QueryHandlerRole.Arn
      Timeout: 10
      MemorySize: 256
      TracingConfig:
        Mode: Active
      VpcConfig:
        SubnetIds:
          - !Ref PrivateSubnet1
          - !Ref PrivateSubnet2
        SecurityGroupIds:
          - !Ref DAXSecurityGroup
      Environment:
        Variables:
          DAX_ENDPOINT: !GetAtt DAXCluster.ClusterDiscoveryEndpointURL
      Code:
        ZipFile: |
          # TODO: 実運用時にDAXクライアントを使用してください
          import json
          def handler(event, context):
              return {'statusCode': 200, 'body': json.dumps({'message': 'query result'})}

  # ----- Lambda: Sync Processor (CDC → 列指向DB同期) -----
  SyncProcessorRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole

  SyncProcessor:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub '${EnvironmentName}-cqrs-sync-processor'
      Runtime: python3.12
      Handler: index.handler
      Role: !GetAtt SyncProcessorRole.Arn
      Timeout: 60
      MemorySize: 512
      TracingConfig:
        Mode: Active
      Code:
        ZipFile: |
          # TODO: Keyspaces / Redshift への同期ロジックを実装してください
          import json
          def handler(event, context):
              for record in event['Records']:
                  print(json.dumps(record))
              return {'statusCode': 200}

  # DynamoDB Streams → Kinesis のパイプ接続
  # NOTE: DynamoDB Streams を Lambda で直接処理するパターンも可能
  StreamEventSourceMapping:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      EventSourceArn: !GetAtt CDCStream.Arn
      FunctionName: !Ref SyncProcessor
      StartingPosition: LATEST
      BatchSize: 100

  # ----- ElastiCache for Redis (セッションキャッシュ) -----
  RedisSubnetGroup:
    Type: AWS::ElastiCache::SubnetGroup
    Properties:
      Description: Redis Subnet Group
      SubnetIds:
        - !Ref PrivateSubnet1
        - !Ref PrivateSubnet2

  RedisSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Redis Security Group
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 6379
          ToPort: 6379
          CidrIp: !Ref VpcCidr

  RedisCluster:
    Type: AWS::ElastiCache::CacheCluster
    Properties:
      ClusterName: !Sub '${EnvironmentName}-cqrs-redis'
      Engine: redis
      CacheNodeType: cache.t3.small  # TODO: 実運用時にサイジング変更してください
      NumCacheNodes: 1
      CacheSubnetGroupName: !Ref RedisSubnetGroup
      VpcSecurityGroupIds:
        - !Ref RedisSecurityGroup

  # ----- CloudWatch + X-Ray -----
  LogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/cqrs/${EnvironmentName}'
      RetentionInDays: 30

  LatencyAlarm:
    Type: AWS::CloudWatch::Alarm
    Properties:
      AlarmName: !Sub '${EnvironmentName}-cqrs-query-latency-high'
      MetricName: Duration
      Namespace: AWS/Lambda
      Statistic: p99
      Period: 300
      EvaluationPeriods: 2
      Threshold: 100
      ComparisonOperator: GreaterThanThreshold
      Dimensions:
        - Name: FunctionName
          Value: !Ref QueryHandler
      AlarmActions:
        - !Ref AlarmTopic

  AlarmTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: !Sub '${EnvironmentName}-cqrs-alarms'
      KmsMasterKeyId: alias/aws/sns

  # NOTE: Amazon Keyspaces (Cassandra) と Redshift は
  # CloudFormation での定義が限定的なため、別途管理を推奨
  # - Keyspaces: AWS::Cassandra::Keyspace / AWS::Cassandra::Table
  # - Redshift Serverless: AWS::RedshiftServerless::Namespace / Workgroup

# ============================================================
# 出力
# ============================================================
Outputs:
  PrimaryTableArn:
    Description: DynamoDB プライマリテーブル ARN
    Value: !GetAtt PrimaryTable.Arn

  DAXEndpoint:
    Description: DAX クラスタエンドポイント
    Value: !GetAtt DAXCluster.ClusterDiscoveryEndpointURL

  CDCStreamArn:
    Description: Kinesis CDC ストリーム ARN
    Value: !GetAtt CDCStream.Arn

  ApiEndpoint:
    Description: API Gateway エンドポイント
    Value: !Sub 'https://${RestApi}.execute-api.${AWS::Region}.amazonaws.com/'

  RedisEndpoint:
    Description: ElastiCache Redis エンドポイント
    Value: !GetAtt RedisCluster.RedisEndpoint.Address

  CommandHandlerArn:
    Description: Command Handler Lambda ARN
    Value: !GetAtt CommandHandler.Arn

  QueryHandlerArn:
    Description: Query Handler Lambda ARN
    Value: !GetAtt QueryHandler.Arn
