in long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala [49:72]
def execute(): JobExecutionResult = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// start the data generator
val rides = env.addSource(source)
// the WatermarkStrategy specifies how to extract timestamps and generate watermarks
val watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
.withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: Long): Long =
ride.getEventTimeMillis
})
// create the pipeline
rides
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(_.rideId)
.process(new AlertFunction())
.addSink(sink)
// execute the pipeline and return the result
env.execute("Long Taxi Rides")
}