in analytics/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [41:74]
public static DataStream<String> createKinesisSource(StreamExecutionEnvironment env, ParameterTool parameter) {
//set Kinesis consumer properties
Properties kinesisConsumerConfig = new Properties();
//set the region the Kinesis stream is located in
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION,
parameter.get("Region", DEFAULT_REGION_NAME));
//obtain credentials through the DefaultCredentialsProviderChain, which includes the instance metadata
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
String adaptiveReadSettingStr = parameter.get("SHARD_USE_ADAPTIVE_READS", "false");
if (adaptiveReadSettingStr.equals("true")) {
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
} else {
//poll new events from the Kinesis stream once every second
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
parameter.get("SHARD_GETRECORDS_INTERVAL_MILLIS", "1000"));
// max records to get in shot
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
parameter.get("SHARD_GETRECORDS_MAX", "10000"));
}
//create Kinesis source
return env.addSource(new FlinkKinesisConsumer<>(
//read events from the Kinesis stream passed in as a parameter
parameter.get("InputStreamName", DEFAULT_STREAM_NAME),
//deserialize events with EventSchema
new SimpleStringSchema(),
//using the previously defined properties
kinesisConsumerConfig
)).name("KinesisSource");
}