public static DataStream createKinesisSource()

in analytics/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [41:74]


    public static DataStream<String> createKinesisSource(StreamExecutionEnvironment env, ParameterTool parameter) {

        //set Kinesis consumer properties
        Properties kinesisConsumerConfig = new Properties();
        //set the region the Kinesis stream is located in
        kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION,
                parameter.get("Region", DEFAULT_REGION_NAME));
        //obtain credentials through the DefaultCredentialsProviderChain, which includes the instance metadata
        kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

        String adaptiveReadSettingStr = parameter.get("SHARD_USE_ADAPTIVE_READS", "false");

        if (adaptiveReadSettingStr.equals("true")) {
            kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
        } else {
            //poll new events from the Kinesis stream once every second
            kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
                    parameter.get("SHARD_GETRECORDS_INTERVAL_MILLIS", "1000"));
            // max records to get in shot
            kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
                    parameter.get("SHARD_GETRECORDS_MAX", "10000"));
        }

        //create Kinesis source

        return env.addSource(new FlinkKinesisConsumer<>(
                //read events from the Kinesis stream passed in as a parameter
                parameter.get("InputStreamName", DEFAULT_STREAM_NAME),
                //deserialize events with EventSchema
                new SimpleStringSchema(),
                //using the previously defined properties
                kinesisConsumerConfig
        )).name("KinesisSource");
    }