def execute()

in hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala [53:82]


    def execute(): JobExecutionResult = {

      val env = StreamExecutionEnvironment.getExecutionEnvironment

      // the taxi fare stream is in order, by timestamp
      val watermarkStrategy = WatermarkStrategy
        .forMonotonousTimestamps[TaxiFare]()
        .withTimestampAssigner(new SerializableTimestampAssigner[TaxiFare] {
          override def extractTimestamp(fare: TaxiFare, streamRecordTimestamp: Long): Long =
            fare.getEventTimeMillis
        })

      // setup the pipeline
      env
        .addSource(source)
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .map((f: TaxiFare) => (f.driverId, f.tip))
        .keyBy(_._1)
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .reduce(
          (f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) },
          new WrapWithWindowInfo()
        )
        .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
        .maxBy(2)
        .addSink(sink)

      // execute the pipeline and return the result
      env.execute("Hourly Tips")
    }