public void run()

in common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java [41:80]


    public void run(SourceContext<TaxiRide> ctx) throws Exception {

        PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100);
        long id = 0;
        long maxStartTime = 0;

        while (running) {

            // generate a batch of START events
            List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE);
            for (int i = 1; i <= BATCH_SIZE; i++) {
                TaxiRide ride = new TaxiRide(id + i, true);
                startEvents.add(ride);
                // the start times may be in order, but let's not assume that
                maxStartTime = Math.max(maxStartTime, ride.getEventTimeMillis());
            }

            // enqueue the corresponding END events
            for (int i = 1; i <= BATCH_SIZE; i++) {
                endEventQ.add(new TaxiRide(id + i, false));
            }

            // release the END events coming before the end of this new batch
            // (this allows a few END events to precede their matching START event)
            while (endEventQ.peek().getEventTimeMillis() <= maxStartTime) {
                TaxiRide ride = endEventQ.poll();
                ctx.collect(ride);
            }

            // then emit the new START events (out-of-order)
            java.util.Collections.shuffle(startEvents, new Random(id));
            startEvents.iterator().forEachRemaining(r -> ctx.collect(r));

            // prepare for the next batch
            id += BATCH_SIZE;

            // don't go too fast
            Thread.sleep(BATCH_SIZE * SLEEP_MILLIS_PER_EVENT);
        }
    }