def __init__()

in cdk/stacks/kinesis/amazon_kinesis_analytics_stack.py [0:0]


    def __init__(self, scope: core.Construct, construct_id: str, stream: kinesis.IStream, db_name: str, table_name: str,
                 kda_role: iam.IRole, log_group_name: str, log_stream_name: str, asset: assets.Asset, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        batch_size_param = core.CfnParameter(self, "batchSizeParam", type="Number",
                                             min_value=1, max_value=100, default=75,
                                             description="Number of records ingested from stream before flushing to "
                                                         "Timestream database")
        kda_application = kda.CfnApplicationV2(
            self, "KdaApplication",
            runtime_environment="FLINK-1_11",
            service_execution_role=kda_role.role_arn,
            application_name=core.Aws.STACK_NAME,
            application_configuration=
            kda.CfnApplicationV2.ApplicationConfigurationProperty(
                application_code_configuration=kda.CfnApplicationV2.ApplicationCodeConfigurationProperty(
                    code_content=kda.CfnApplicationV2.CodeContentProperty(
                        s3_content_location=kda.CfnApplicationV2.S3ContentLocationProperty(
                            bucket_arn=asset.bucket.bucket_arn,
                            file_key=asset.s3_object_key
                        )
                    ),
                    code_content_type="ZIPFILE"
                ),
                environment_properties=kda.CfnApplicationV2.EnvironmentPropertiesProperty(
                    property_groups=[
                        kda.CfnApplicationV2.PropertyGroupProperty(
                            property_group_id="FlinkApplicationProperties",
                            property_map={
                                "InputStreamName": stream.stream_name,
                                "Region": core.Aws.REGION,
                                "TimestreamDbName": db_name,
                                "TimestreamTableName": table_name,
                                "TimestreamIngestBatchSize": batch_size_param.value_as_number
                            }
                        )
                    ]
                ),
                application_snapshot_configuration=kda.CfnApplicationV2.ApplicationSnapshotConfigurationProperty(
                    snapshots_enabled=False
                ),
                flink_application_configuration=kda.CfnApplicationV2.FlinkApplicationConfigurationProperty(
                    monitoring_configuration=kda.CfnApplicationV2.MonitoringConfigurationProperty(
                        configuration_type="CUSTOM",
                        log_level="INFO",
                        metrics_level="TASK"
                    ),
                    parallelism_configuration=kda.CfnApplicationV2.ParallelismConfigurationProperty(
                        configuration_type="CUSTOM",
                        auto_scaling_enabled=False,
                        parallelism=1,
                        parallelism_per_kpu=1
                    ),
                    checkpoint_configuration=kda.CfnApplicationV2.CheckpointConfigurationProperty(
                        configuration_type="CUSTOM",
                        # the properties below are optional
                        checkpointing_enabled=True,
                        checkpoint_interval=60_000,
                        min_pause_between_checkpoints=60_000
                    )
                )
            )
        )

        kda_logging = kda.CfnApplicationCloudWatchLoggingOptionV2(
            self, "FlinkLogging",
            application_name=kda_application.application_name,
            cloud_watch_logging_option=kda.CfnApplicationCloudWatchLoggingOptionV2.CloudWatchLoggingOptionProperty(
                log_stream_arn="arn:{}:logs:{}:{}:log-group:{}:log-stream:{}".format(
                    core.Aws.PARTITION, core.Aws.REGION, core.Aws.ACCOUNT_ID,
                    log_group_name, log_stream_name)))

        kda_logging.add_depends_on(kda_application)

        core.CfnOutput(self, "KdaApplicationName", value=kda_application.application_name,
                       export_name="KdaApplicationName")