def execute()

in long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala [49:72]


    def execute(): JobExecutionResult = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment

      // start the data generator
      val rides = env.addSource(source)

      // the WatermarkStrategy specifies how to extract timestamps and generate watermarks
      val watermarkStrategy = WatermarkStrategy
        .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
        .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
          override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: Long): Long =
            ride.getEventTimeMillis
        })

      // create the pipeline
      rides
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .keyBy(_.rideId)
        .process(new AlertFunction())
        .addSink(sink)

      // execute the pipeline and return the result
      env.execute("Long Taxi Rides")
    }