in S3ParquetSink/src/main/java/com/amazonaws/services/kinesisanalytics/S3StreamingParquetSinkJob.java [55:76]
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
ObjectMapper jsonParser = new ObjectMapper();
input.map(value -> { // Parse the JSON
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
return new Tuple2<>(jsonNode.get("TICKER").toString(), 1);
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0) // Logically partition the stream for each word
// .timeWindow(Time.minutes(1)) // Tumbling window definition // Flink 1.11
.window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // Flink 1.13
.sum(1) // Count the appearances by ticker per partition
.map(t -> new TickCount(t.f0, t.f1))
.addSink(createS3SinkFromStaticConfig())
.name("S3 Parquet Sink");
env.execute("Flink S3 Streaming Sink Job");
}