in src/main/java/com/amazonaws/kda/flink/starterkit/SessionProcessor.java [113:126]
private static DataStream<String> createKinesisSource(StreamExecutionEnvironment env, ParameterTool paramTool) {
log.info("Creating Kinesis source from Application Properties");
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, paramTool.get("region"));
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
paramTool.get("stream_init_position"));
if (paramTool.get("stream_init_position").equalsIgnoreCase(StreamPosition.AT_TIMESTAMP.name())) {
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
paramTool.get("stream_initial_timestamp"));
}
return env.addSource(new FlinkKinesisConsumer<>(paramTool.get("input_stream_name"), new SimpleStringSchema(),
inputProperties));
}