in long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java [114:148]
public void processElement(TaxiRide ride, Context context, Collector<Long> out)
throws Exception {
TaxiRide firstRideEvent = rideState.value();
if (firstRideEvent == null) {
// whatever event comes first, remember it
rideState.update(ride);
if (ride.isStart) {
// we will use this timer to check for rides that have gone on too long and may
// not yet have an END event (or the END event could be missing)
context.timerService().registerEventTimeTimer(getTimerTime(ride));
}
} else {
if (ride.isStart) {
if (rideTooLong(ride, firstRideEvent)) {
out.collect(ride.rideId);
}
} else {
// the first ride was a START event, so there is a timer unless it has fired
context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));
// perhaps the ride has gone on too long, but the timer didn't fire yet
if (rideTooLong(firstRideEvent, ride)) {
out.collect(ride.rideId);
}
}
// both events have now been seen, we can clear the state
// this solution can leak state if an event is missing
// see DISCUSSION.md for more information
rideState.clear();
}
}