in analytics/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [76:102]
public static void main(String[] args) throws Exception {
final ParameterTool parameter = ParameterToolUtils.fromArgsAndApplicationProperties(args);
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final String region = parameter.get("Region", DEFAULT_REGION_NAME);
final String databaseName = parameter.get("TimestreamDbName", DEFAULT_DB_NAME);
final String tableName = parameter.get("TimestreamTableName", DEFAULT_TABLE_NAME);
final int batchSize = Integer.parseInt(parameter.get("TimestreamIngestBatchSize", "75"));
TimestreamInitializer timestreamInitializer = new TimestreamInitializer(region);
timestreamInitializer.createDatabase(databaseName);
timestreamInitializer.createTable(databaseName, tableName);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
createKinesisSource(env, parameter)
.map(new JsonToTimestreamPayloadFn()).name("MaptoTimestreamPayload")
.process(new OffsetFutureTimestreamPoints()).name("UpdateFutureOffsetedTimestreamPoints")
.addSink(new TimestreamSink(region, databaseName, tableName, batchSize))
.name("TimeSeries<" + databaseName + ", " + tableName + ">");
// execute program
env.execute("Flink Streaming Java API Skeleton");
}