in long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala [93:130]
override def processElement(
ride: TaxiRide,
context: KeyedProcessFunction[Long, TaxiRide, Long]#Context,
out: Collector[Long]
): Unit = {
val firstRideEvent: TaxiRide = rideState.value
if (firstRideEvent == null) {
// whatever event comes first, remember it
rideState.update(ride)
if (ride.isStart) {
// we will use this timer to check for rides that have gone on too long and may
// not yet have an END event (or the END event could be missing)
context.timerService.registerEventTimeTimer(getTimerTime(ride))
}
} else {
if (ride.isStart) {
if (rideTooLong(ride, firstRideEvent)) {
out.collect(ride.rideId)
}
} else {
// the first ride was a START event, so there is a timer unless it has fired
context.timerService.deleteEventTimeTimer(getTimerTime(firstRideEvent))
// perhaps the ride has gone on too long, but the timer didn't fire yet
if (rideTooLong(firstRideEvent, ride)) {
out.collect(ride.rideId)
}
}
// both events have now been seen, we can clear the state
// this solution can leak state if an event is missing
// see DISCUSSION.md for more information
rideState.clear()
}
}