in demo-apps/demo-kda-app/src/main/java/com/amazonaws/services/kinesisanalytics/DemoKDAApp.java [52:88]
public static DataStream<EmployeeInfo> createKinesisSource(StreamExecutionEnvironment env,
ParameterTool parameter) throws Exception {
//set Kinesis consumer properties
Properties kinesisConsumerConfig = new Properties();
//set the region the Kinesis stream is located in
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION,
parameter.get("AWS_REGION", "us-east-2"));
//obtain credentials through the DefaultCredentialsProviderChain, which includes the instance metadata
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
String adaptiveReadSettingStr = parameter.get("SHARD_USE_ADAPTIVE_READS", "false");
if (adaptiveReadSettingStr.equals("true")) {
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
} else {
//poll new events from the Kinesis stream once every second
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
parameter.get("SHARD_GETRECORDS_INTERVAL_MILLIS", "1000"));
// max records to get in shot
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
parameter.get("SHARD_GETRECORDS_MAX", "10000"));
}
//create Kinesis source
DataStream<EmployeeInfo> kinesisStream = env.addSource(new FlinkKinesisConsumer<EmployeeInfo>(
//read events from the Kinesis stream passed in as a parameter
parameter.get("KINESIS_STREAM", "AmazonKinesisStream1"),
//deserialize events with EventSchema
new EmployeeInfoDeserializationSchema(),
//using the previously defined properties
kinesisConsumerConfig
)).name("KinesisSource");
return kinesisStream;
}