fun main()

in analytics-kotlin/src/main/kotlin/services/kinesisanalytics/StreamingJob.kt [83:106]


    fun main(args: Array<String>) {
        val parameter: ParameterTool = ParameterToolUtils.fromArgsAndApplicationProperties(args)

        // set up the streaming execution environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment()
        val region = parameter["Region", DEFAULT_REGION_NAME]
        val databaseName = parameter["TimestreamDbName", DEFAULT_DB_NAME]
        val tableName = parameter["TimestreamTableName", DEFAULT_TABLE_NAME]
        val batchSize = parameter["TimestreamIngestBatchSize", "75"].toInt()

        createDatabaseAndTableIfNotExist(region, databaseName, tableName)

        env.streamTimeCharacteristic = TimeCharacteristic.EventTime
        env.config.autoWatermarkInterval = 1000L

        createKinesisSource(env, parameter)
            .map(JsonToTimestreamPayloadFn()).name("MaptoTimestreamPayload")
            .process(OffsetFutureTimestreamPoints()).name("UpdateFutureOffsetedTimestreamPoints")
            .addSink(TimestreamSink(region, databaseName, tableName, batchSize))
            .name("TimestreamSink<$databaseName, $tableName>")

        // execute program
        env.execute("Flink Streaming Java API Skeleton")
    }