in CustomMetrics/RecordCount/src/main/java/com/amazonaws/services/kinesisanalytics/RecordCountApplication.java [27:77]
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Map<String, Properties> runConfigurations = KinesisAnalyticsRuntime.getApplicationProperties();
// Get the Kinesis Consumer properties from runtime configuration
Properties inputConfig = runConfigurations.get("consumer.config.0");
String inputStreamName = inputConfig.getProperty("input.stream.name");
String inputRegion = inputConfig.getProperty("aws.region");
String inputStartingPosition = inputConfig.getProperty("flink.stream.initpos");
Properties consumerProperty = new Properties();
consumerProperty.put("aws.region", inputRegion);
consumerProperty.put("flink.stream.initpos", inputStartingPosition);
// Add the Kinesis Consumer as input
DataStream<String> kinesisInput =
env.addSource(new FlinkKinesisConsumer(inputStreamName, new SimpleStringSchema(), consumerProperty));
// Add the NoOpMapperFunction to publish custom 'ReceivedRecords' metric before filtering
DataStream<String> noopMapperFunctionBeforeFilter = kinesisInput.map(new NoOpMapperFunction("ReceivedRecords"));
// Add the FilterFunction to filter the records based on MinSpeed (i.e. 106)
DataStream<String> kinesisProcessed = noopMapperFunctionBeforeFilter.filter(new FilterFunction<String>() {
public boolean filter(String value) throws Exception {
return RecordSchemaHelper.isGreaterThanMinSpeed(value);
}
});
// Add the NoOpMapperFunction to publish custom 'FilteredRecords' metric after filtering
DataStream<String> noopMapperFunctionAfterFilter =
kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));
// Get the Kinesis Producer properties from runtime configuration
Properties outputConfig = runConfigurations.get("producer.config.0");
String outputStreamName = outputConfig.getProperty("output.stream.name");
String shardCount = outputConfig.getProperty("shard.count");
String outputRegion = outputConfig.getProperty("aws.region");
Properties producerConfig = new Properties();
producerConfig.put("aws.region", outputRegion);
producerConfig.put("AggregationEnabled", "false");
// Add the Kinesis Producer as output
FlinkKinesisProducer<String> kinesisOutput = new FlinkKinesisProducer(new SimpleStringSchema(), producerConfig);
kinesisOutput.setDefaultStream(outputStreamName);
kinesisOutput.setDefaultPartition(RandomStringUtils.random(Integer.valueOf(shardCount)));
noopMapperFunctionAfterFilter.addSink(kinesisOutput);
LOG.info("Starting flink job: {} with using input kinesis stream: {} with initial position: {} and output"
+ " kinesis stream: {} with shard count: {}", new Object[] { "RecordCountApplication",
inputStreamName, inputStartingPosition, outputStreamName, shardCount});
env.execute("RecordCountApplication Job");
}