in demo-apps/demo-kda-app/src/main/java/com/amazonaws/services/kinesisanalytics/DemoKDAApp.java [90:141]
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure for event time (as opposed to processing time)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ParameterTool parameter;
// get application parameters
if (env instanceof LocalStreamEnvironment) {
//read the parameters specified from the command line
parameter = ParameterTool.fromArgs(args);
} else {
//read the parameters from the Kinesis Analytics environment
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");
if (flinkProperties == null) {
throw new RuntimeException("Unable to load FlinkApplicationProperties properties from the Kinesis Analytics Runtime.");
}
parameter = ParameterToolUtils.fromApplicationProperties(flinkProperties);
}
// read from Kinesis
DataStream<EmployeeInfo> rawEmployeeStream = createKinesisSource(env, parameter);
//rawEmployeeStream.print();
// assign timestamps and watermarks
DataStream<EmployeeInfo> employeeStream =
rawEmployeeStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<EmployeeInfo>(
Time.milliseconds(MAX_OUT_OF_ORDERNESS_IN_MS)) {
@Override
public long extractTimestamp(EmployeeInfo employeeInfo) {
return employeeInfo.getEventtimestamp();
}
});
// do a simple aggregation that returns number of employees (f1) per company (f0) per window
SingleOutputStreamOperator<Tuple2<Long, Long>> procStream = employeeStream.keyBy("companyid")
.timeWindow(Time.milliseconds(TUMBLING_WINDOW_PERIOD_IN_MS))
.apply(new CountWindowFn());
procStream.print();
// execute program
env.execute("Demo KDA Kinesis Consumer");
}