def __init__()

in cdk/stacks/kinesis/amazon_kinesis_analytics_source_stack.py [0:0]


    def __init__(self, scope: core.Construct, construct_id: str,
                 stream: kinesis.IStream, kda_path: str,
                 database: timestream.CfnDatabase, table: timestream.CfnTable,
                 **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        asset = assets.Asset(self, "flink-source", path=kda_path)

        log_group = logs.LogGroup(self, "KdaLogGroup",
                                  retention=RetentionDays.FIVE_DAYS,
                                  removal_policy=RemovalPolicy.DESTROY)
        log_stream = log_group.add_stream("KdaLogStream")

        kda_role = iam.Role(self, "KdaRole",
                            assumed_by=iam.ServicePrincipal("kinesisanalytics.amazonaws.com"),
                            )

        asset.grant_read(kda_role)
        stream.grant_read(kda_role)
        cloudwatch.Metric.grant_put_metric_data(kda_role)
        log_group.grant(kda_role, "logs:DescribeLogStreams")
        log_group.grant_write(kda_role)

        kda_role.add_to_policy(iam.PolicyStatement(
            actions=["timestream:DescribeEndpoints",
                     "timestream:ListTables",
                     "timestream:ListDatabases",
                     "timestream:DescribeTable",
                     "timestream:DescribeDatabase",
                     ],
            resources=["*"]
        ))

        kda_role.add_to_policy(iam.PolicyStatement(
            actions=["timestream:*Database"],
            resources=[database.attr_arn]
        ))

        kda_role.add_to_policy(iam.PolicyStatement(
            actions=["timestream:*Table", "timestream:WriteRecords"],
            resources=[table.attr_arn]
        ))

        kda_role.add_to_policy(iam.PolicyStatement(
            actions=["kms:DescribeKey"],
            resources=["*"]
        ))

        kda_role.add_to_policy(iam.PolicyStatement(
            actions=["kms:CreateGrant"],
            resources=["*"],
            conditions={
                "ForAnyValue:StringEquals": {
                    "kms:EncryptionContextKeys": "aws:timestream:database-name"
                },
                "Bool": {
                    "kms:GrantIsForAWSResource": True
                },
                "StringLike": {
                    "kms:ViaService": "timestream.*.amazonaws.com"
                }
            }
        ))

        kda_role.add_to_policy(iam.PolicyStatement(actions=["kinesis:ListShards"], resources=[stream.stream_arn]))

        self._asset = asset
        self._kda_role = kda_role
        self._log_group_name = log_group.log_group_name
        self._log_stream_name = log_stream.log_stream_name