in TumblingWindow/src/main/java/com/amazonaws/services/kinesisanalytics/TumblingWindowStreamingJob.java [52:73]
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
ObjectMapper jsonParser = new ObjectMapper();
input.map(value -> { // Parse the JSON
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
return new Tuple2<>(jsonNode.get("TICKER").toString(), 1);
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0) // Logically partition the stream for each word
//.timeWindow(Time.seconds(5)) // Tumbling window definition (Flink 1.11)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //Flink 1.13
.sum(1) // Sum the number of words per partition
.map(value -> value.f0 + "," + value.f1.toString() + "\n")
.addSink(createSinkFromStaticConfig());
env.execute("Word Count");
}