in common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java [41:80]
public void run(SourceContext<TaxiRide> ctx) throws Exception {
PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100);
long id = 0;
long maxStartTime = 0;
while (running) {
// generate a batch of START events
List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE);
for (int i = 1; i <= BATCH_SIZE; i++) {
TaxiRide ride = new TaxiRide(id + i, true);
startEvents.add(ride);
// the start times may be in order, but let's not assume that
maxStartTime = Math.max(maxStartTime, ride.getEventTimeMillis());
}
// enqueue the corresponding END events
for (int i = 1; i <= BATCH_SIZE; i++) {
endEventQ.add(new TaxiRide(id + i, false));
}
// release the END events coming before the end of this new batch
// (this allows a few END events to precede their matching START event)
while (endEventQ.peek().getEventTimeMillis() <= maxStartTime) {
TaxiRide ride = endEventQ.poll();
ctx.collect(ride);
}
// then emit the new START events (out-of-order)
java.util.Collections.shuffle(startEvents, new Random(id));
startEvents.iterator().forEachRemaining(r -> ctx.collect(r));
// prepare for the next batch
id += BATCH_SIZE;
// don't go too fast
Thread.sleep(BATCH_SIZE * SLEEP_MILLIS_PER_EVENT);
}
}