in hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala [53:82]
def execute(): JobExecutionResult = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// the taxi fare stream is in order, by timestamp
val watermarkStrategy = WatermarkStrategy
.forMonotonousTimestamps[TaxiFare]()
.withTimestampAssigner(new SerializableTimestampAssigner[TaxiFare] {
override def extractTimestamp(fare: TaxiFare, streamRecordTimestamp: Long): Long =
fare.getEventTimeMillis
})
// setup the pipeline
env
.addSource(source)
.assignTimestampsAndWatermarks(watermarkStrategy)
.map((f: TaxiFare) => (f.driverId, f.tip))
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.reduce(
(f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) },
new WrapWithWindowInfo()
)
.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
.maxBy(2)
.addSink(sink)
// execute the pipeline and return the result
env.execute("Hourly Tips")
}