override def processElement()

in long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala [93:130]


    override def processElement(
        ride: TaxiRide,
        context: KeyedProcessFunction[Long, TaxiRide, Long]#Context,
        out: Collector[Long]
    ): Unit = {

      val firstRideEvent: TaxiRide = rideState.value

      if (firstRideEvent == null) {
        // whatever event comes first, remember it
        rideState.update(ride)

        if (ride.isStart) {
          // we will use this timer to check for rides that have gone on too long and may
          // not yet have an END event (or the END event could be missing)
          context.timerService.registerEventTimeTimer(getTimerTime(ride))
        }
      } else {
        if (ride.isStart) {
          if (rideTooLong(ride, firstRideEvent)) {
            out.collect(ride.rideId)
          }
        } else {
          // the first ride was a START event, so there is a timer unless it has fired
          context.timerService.deleteEventTimeTimer(getTimerTime(firstRideEvent))

          // perhaps the ride has gone on too long, but the timer didn't fire yet
          if (rideTooLong(firstRideEvent, ride)) {
            out.collect(ride.rideId)
          }
        }

        // both events have now been seen, we can clear the state
        // this solution can leak state if an event is missing
        // see DISCUSSION.md for more information
        rideState.clear()
      }
    }