in analytics-kotlin/src/main/kotlin/services/kinesisanalytics/StreamingJob.kt [83:106]
fun main(args: Array<String>) {
val parameter: ParameterTool = ParameterToolUtils.fromArgsAndApplicationProperties(args)
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val region = parameter["Region", DEFAULT_REGION_NAME]
val databaseName = parameter["TimestreamDbName", DEFAULT_DB_NAME]
val tableName = parameter["TimestreamTableName", DEFAULT_TABLE_NAME]
val batchSize = parameter["TimestreamIngestBatchSize", "75"].toInt()
createDatabaseAndTableIfNotExist(region, databaseName, tableName)
env.streamTimeCharacteristic = TimeCharacteristic.EventTime
env.config.autoWatermarkInterval = 1000L
createKinesisSource(env, parameter)
.map(JsonToTimestreamPayloadFn()).name("MaptoTimestreamPayload")
.process(OffsetFutureTimestreamPoints()).name("UpdateFutureOffsetedTimestreamPoints")
.addSink(TimestreamSink(region, databaseName, tableName, batchSize))
.name("TimestreamSink<$databaseName, $tableName>")
// execute program
env.execute("Flink Streaming Java API Skeleton")
}