in cdk/stacks/kinesis/amazon_kinesis_analytics_stack.py [0:0]
def __init__(self, scope: core.Construct, construct_id: str, stream: kinesis.IStream, db_name: str, table_name: str,
kda_role: iam.IRole, log_group_name: str, log_stream_name: str, asset: assets.Asset, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
batch_size_param = core.CfnParameter(self, "batchSizeParam", type="Number",
min_value=1, max_value=100, default=75,
description="Number of records ingested from stream before flushing to "
"Timestream database")
kda_application = kda.CfnApplicationV2(
self, "KdaApplication",
runtime_environment="FLINK-1_11",
service_execution_role=kda_role.role_arn,
application_name=core.Aws.STACK_NAME,
application_configuration=
kda.CfnApplicationV2.ApplicationConfigurationProperty(
application_code_configuration=kda.CfnApplicationV2.ApplicationCodeConfigurationProperty(
code_content=kda.CfnApplicationV2.CodeContentProperty(
s3_content_location=kda.CfnApplicationV2.S3ContentLocationProperty(
bucket_arn=asset.bucket.bucket_arn,
file_key=asset.s3_object_key
)
),
code_content_type="ZIPFILE"
),
environment_properties=kda.CfnApplicationV2.EnvironmentPropertiesProperty(
property_groups=[
kda.CfnApplicationV2.PropertyGroupProperty(
property_group_id="FlinkApplicationProperties",
property_map={
"InputStreamName": stream.stream_name,
"Region": core.Aws.REGION,
"TimestreamDbName": db_name,
"TimestreamTableName": table_name,
"TimestreamIngestBatchSize": batch_size_param.value_as_number
}
)
]
),
application_snapshot_configuration=kda.CfnApplicationV2.ApplicationSnapshotConfigurationProperty(
snapshots_enabled=False
),
flink_application_configuration=kda.CfnApplicationV2.FlinkApplicationConfigurationProperty(
monitoring_configuration=kda.CfnApplicationV2.MonitoringConfigurationProperty(
configuration_type="CUSTOM",
log_level="INFO",
metrics_level="TASK"
),
parallelism_configuration=kda.CfnApplicationV2.ParallelismConfigurationProperty(
configuration_type="CUSTOM",
auto_scaling_enabled=False,
parallelism=1,
parallelism_per_kpu=1
),
checkpoint_configuration=kda.CfnApplicationV2.CheckpointConfigurationProperty(
configuration_type="CUSTOM",
# the properties below are optional
checkpointing_enabled=True,
checkpoint_interval=60_000,
min_pause_between_checkpoints=60_000
)
)
)
)
kda_logging = kda.CfnApplicationCloudWatchLoggingOptionV2(
self, "FlinkLogging",
application_name=kda_application.application_name,
cloud_watch_logging_option=kda.CfnApplicationCloudWatchLoggingOptionV2.CloudWatchLoggingOptionProperty(
log_stream_arn="arn:{}:logs:{}:{}:log-group:{}:log-stream:{}".format(
core.Aws.PARTITION, core.Aws.REGION, core.Aws.ACCOUNT_ID,
log_group_name, log_stream_name)))
kda_logging.add_depends_on(kda_application)
core.CfnOutput(self, "KdaApplicationName", value=kda_application.application_name,
export_name="KdaApplicationName")