in FirehoseSink/src/main/java/com/amazonaws/services/kinesisanalytics/FirehoseSinkStreamingJob.java [62:80]
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.map(value -> { // Parse the JSON
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
return new Tuple2<>(jsonNode.get("TICKER").asText(),
jsonNode.get("PRICE").asDouble());
}).returns(Types.TUPLE(Types.STRING, Types.DOUBLE))
.keyBy(0) // Logically partition the stream per stock symbol
.timeWindow(Time.seconds(10), Time.seconds(5)) // Sliding window definition
.min(1) // Calculate the minimum price over the window
.map(value -> value.f0 + ": min - " + value.f1.toString() + "\n")
.addSink(createFirehoseSinkFromStaticConfig()); // write to Firehose Delivery Stream
env.execute("Mininum Stock Price");
}