in hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java [75:109]
public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start the data generator and arrange for watermarking
DataStream<TaxiFare> fares =
env.addSource(source)
.assignTimestampsAndWatermarks(
// taxi fares are in order
WatermarkStrategy.<TaxiFare>forMonotonousTimestamps()
.withTimestampAssigner(
(fare, t) -> fare.getEventTimeMillis()));
// compute tips per hour for each driver
DataStream<Tuple3<Long, Long, Float>> hourlyTips =
fares.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());
// find the driver with the highest sum of tips for each hour
DataStream<Tuple3<Long, Long, Float>> hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
/* You should explore how this alternative (commented out below) behaves.
* In what ways is the same as, and different from, the solution above (using a windowAll)?
*/
// DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips.keyBy(t -> t.f0).maxBy(2);
hourlyMax.addSink(sink);
// execute the transformation pipeline
return env.execute("Hourly Tips");
}