private static DataStream createKinesisSource()

in src/main/java/com/amazonaws/kda/flink/starterkit/SessionProcessor.java [113:126]


	private static DataStream<String> createKinesisSource(StreamExecutionEnvironment env, ParameterTool paramTool) {
		log.info("Creating Kinesis source from Application Properties");
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, paramTool.get("region"));
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
				paramTool.get("stream_init_position"));
		if (paramTool.get("stream_init_position").equalsIgnoreCase(StreamPosition.AT_TIMESTAMP.name())) {
			inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
					paramTool.get("stream_initial_timestamp"));
		}

		return env.addSource(new FlinkKinesisConsumer<>(paramTool.get("input_stream_name"), new SimpleStringSchema(),
				inputProperties));
	}