public static DataStream createKinesisSource()

in demo-apps/demo-kda-app/src/main/java/com/amazonaws/services/kinesisanalytics/DemoKDAApp.java [52:88]


    public static DataStream<EmployeeInfo> createKinesisSource(StreamExecutionEnvironment env,
                                                               ParameterTool parameter) throws Exception {

        //set Kinesis consumer properties
        Properties kinesisConsumerConfig = new Properties();
        //set the region the Kinesis stream is located in
        kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION,
                parameter.get("AWS_REGION", "us-east-2"));
        //obtain credentials through the DefaultCredentialsProviderChain, which includes the instance metadata
        kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

        String adaptiveReadSettingStr = parameter.get("SHARD_USE_ADAPTIVE_READS", "false");

        if (adaptiveReadSettingStr.equals("true")) {
            kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
        } else {
            //poll new events from the Kinesis stream once every second
            kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
                    parameter.get("SHARD_GETRECORDS_INTERVAL_MILLIS", "1000"));
            // max records to get in shot
            kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
                    parameter.get("SHARD_GETRECORDS_MAX", "10000"));
        }


        //create Kinesis source
        DataStream<EmployeeInfo> kinesisStream = env.addSource(new FlinkKinesisConsumer<EmployeeInfo>(
                //read events from the Kinesis stream passed in as a parameter
                parameter.get("KINESIS_STREAM", "AmazonKinesisStream1"),
                //deserialize events with EventSchema
                new EmployeeInfoDeserializationSchema(),
                //using the previously defined properties
                kinesisConsumerConfig
        )).name("KinesisSource");

        return kinesisStream;
    }