AWSTemplateFormatVersion: '2010-09-09'
Description: >
  Lambda vs Kappa Data Architecture Comparison - Reference Implementation.
  Deploys both Lambda Architecture (MSK + Glue + Redshift + Flink + DynamoDB + Athena)
  and Kappa Architecture (MSK + Flink + OpenSearch + DynamoDB) side by side.

Parameters:
  Environment:
    Type: String
    Default: dev
    AllowedValues: [dev, stg, prod]
    Description: Deployment environment
  VpcId:
    Type: AWS::EC2::VPC::Id
    Description: "VPC ID for MSK and OpenSearch # TODO: replace with your VPC"
  SubnetIds:
    Type: List<AWS::EC2::Subnet::Id>
    Description: "Subnet IDs (at least 2 AZs) # TODO: replace with your subnets"
  KafkaVersion:
    Type: String
    Default: "3.6.0"
    Description: Apache Kafka version for MSK

Conditions:
  IsProd: !Equals [!Ref Environment, prod]

Resources:

  # ==========================================
  # Shared: Amazon MSK Cluster
  # ==========================================
  MSKCluster:
    Type: AWS::MSK::Cluster
    Properties:
      ClusterName: !Sub "data-arch-msk-${Environment}"
      KafkaVersion: !Ref KafkaVersion
      NumberOfBrokerNodes: !If [IsProd, 6, 3]
      BrokerNodeGroupInfo:
        InstanceType: !If [IsProd, kafka.m5.large, kafka.t3.small]
        ClientSubnets: !Ref SubnetIds
        StorageInfo:
          EBSStorageInfo:
            VolumeSize: !If [IsProd, 500, 100]
      EncryptionInfo:
        EncryptionInTransit:
          ClientBroker: TLS
          InCluster: true
        EncryptionAtRest:
          DataVolumeKMSKeyId: alias/aws/kafka
      Tags:
        Architecture: LambdaAndKappa
        Environment: !Ref Environment

  # ==========================================
  # Lambda Architecture: Batch Layer
  # ==========================================
  DataLakeBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub "data-arch-lake-${AWS::AccountId}-${Environment}"
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: aws:kms
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      VersioningConfiguration:
        Status: Enabled
      LifecycleConfiguration:
        Rules:
          - Id: TransitionToIA
            Status: Enabled
            Transitions:
              - StorageClass: STANDARD_IA
                TransitionInDays: 90

  GlueDatabase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: !Sub "data_arch_${Environment}"
        Description: "Glue catalog for Lambda Architecture batch processing"

  GlueBatchJob:
    Type: AWS::Glue::Job
    Properties:
      Name: !Sub "data-arch-batch-etl-${Environment}"
      Role: !GetAtt GlueJobRole.Arn
      Command:
        Name: glueetl
        ScriptLocation: !Sub "s3://${DataLakeBucket}/scripts/batch-etl.py"
        PythonVersion: "3"
      DefaultArguments:
        "--job-language": python
        "--enable-metrics": "true"
        "--enable-continuous-cloudwatch-log": "true"
      GlueVersion: "4.0"
      WorkerType: !If [IsProd, G.1X, G.025X]
      NumberOfWorkers: !If [IsProd, 10, 2]

  GlueJobRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: glue.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
      Policies:
        - PolicyName: S3Access
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: ["s3:GetObject", "s3:PutObject", "s3:ListBucket"]
                Resource:
                  - !GetAtt DataLakeBucket.Arn
                  - !Sub "${DataLakeBucket.Arn}/*"

  # Redshift Serverless (Batch Views)
  RedshiftNamespace:
    Type: AWS::RedshiftServerless::Namespace
    Properties:
      NamespaceName: !Sub "data-arch-${Environment}"
      AdminUsername: admin
      AdminUserPassword: "{{resolve:secretsmanager:data-arch-redshift-pw}}" # TODO: create secret
      DbName: dataarch
      DefaultIamRoleArn: !GetAtt RedshiftRole.Arn

  RedshiftWorkgroup:
    Type: AWS::RedshiftServerless::Workgroup
    Properties:
      WorkgroupName: !Sub "data-arch-wg-${Environment}"
      NamespaceName: !Ref RedshiftNamespace
      BaseCapacity: !If [IsProd, 128, 32]
      SubnetIds: !Ref SubnetIds
      PubliclyAccessible: false

  RedshiftRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: redshift.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess

  # ==========================================
  # Lambda Architecture: Speed Layer
  # ==========================================
  SpeedLayerDynamoDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub "data-arch-rt-views-${Environment}"
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: pk
          AttributeType: S
        - AttributeName: sk
          AttributeType: S
      KeySchema:
        - AttributeName: pk
          KeyType: HASH
        - AttributeName: sk
          KeyType: RANGE
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: true
      SSESpecification:
        SSEEnabled: true
        SSEType: KMS

  # ==========================================
  # Kappa Architecture: Serving Store
  # ==========================================
  KappaOpenSearchCollection:
    Type: AWS::OpenSearchServerless::Collection
    Properties:
      Name: !Sub "kappa-serving-${Environment}"
      Type: SEARCH
      Description: "Kappa Architecture serving store"

  KappaDynamoDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub "kappa-serving-${Environment}"
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: pk
          AttributeType: S
        - AttributeName: sk
          AttributeType: S
      KeySchema:
        - AttributeName: pk
          KeyType: HASH
        - AttributeName: sk
          KeyType: RANGE
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: true
      SSESpecification:
        SSEEnabled: true
        SSEType: KMS

  # ==========================================
  # Shared: Managed Flink Application
  # ==========================================
  FlinkApplicationRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: kinesisanalytics.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: FlinkAccess
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "kafka:DescribeCluster"
                  - "kafka:GetBootstrapBrokers"
                  - "kafka-cluster:*Topic*"
                  - "kafka-cluster:*Group*"
                  - "kafka-cluster:Connect"
                Resource: "*"
              - Effect: Allow
                Action:
                  - "dynamodb:PutItem"
                  - "dynamodb:BatchWriteItem"
                Resource:
                  - !GetAtt SpeedLayerDynamoDBTable.Arn
                  - !GetAtt KappaDynamoDBTable.Arn

Outputs:
  MSKClusterArn:
    Value: !Ref MSKCluster
    Description: MSK Cluster ARN (shared by both architectures)
  DataLakeBucketName:
    Value: !Ref DataLakeBucket
    Description: S3 Data Lake bucket (Lambda Batch Layer)
  RedshiftWorkgroupEndpoint:
    Value: !GetAtt RedshiftWorkgroup.Workgroup.Endpoint.Address
    Description: Redshift Serverless endpoint (Lambda Batch Views)
  SpeedLayerTableName:
    Value: !Ref SpeedLayerDynamoDBTable
    Description: DynamoDB table (Lambda Speed Layer RT Views)
  KappaOpenSearchEndpoint:
    Value: !GetAtt KappaOpenSearchCollection.CollectionEndpoint
    Description: OpenSearch Serverless endpoint (Kappa Serving Store)
  KappaServingTableName:
    Value: !Ref KappaDynamoDBTable
    Description: DynamoDB table (Kappa Serving Store)
