protected void emitRecords()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java [182:266]


    protected void emitRecords() {
        // emit available records, ordered by timestamp
        AsyncRecordQueue<T> min = heads.poll();
        runLoop:
        while (running) {
            // find a queue to emit from
            while (min == null) {
                // check new or previously empty queues
                if (!emptyQueues.isEmpty()) {
                    for (AsyncRecordQueue<T> queueHead : emptyQueues.keySet()) {
                        if (!queueHead.queue.isEmpty()) {
                            emptyQueues.remove(queueHead);
                            queueHead.headTimestamp = queueHead.queue.peek().getTimestamp();
                            heads.offer(queueHead);
                        }
                    }
                }
                min = heads.poll();
                if (min == null) {
                    synchronized (condition) {
                        // wait for work
                        try {
                            condition.wait(idleSleepMillis);
                        } catch (InterruptedException e) {
                            continue runLoop;
                        }
                    }
                }
                if (!running) {
                    // Make sure we can exit this loop so the thread can shut down
                    break runLoop;
                }
            }

            // wait until ready to emit min or another queue receives elements
            while (min.headTimestamp > maxEmitTimestamp) {
                synchronized (condition) {
                    // wait until ready to emit
                    try {
                        condition.wait(idleSleepMillis);
                    } catch (InterruptedException e) {
                        continue runLoop;
                    }
                    if (min.headTimestamp > maxEmitTimestamp && !emptyQueues.isEmpty()) {
                        // see if another queue can make progress
                        heads.offer(min);
                        min = null;
                        continue runLoop;
                    }
                }
                if (!running) {
                    // Make sure we can exit this loop so the thread can shut down
                    break runLoop;
                }
            }

            // emit up to queue capacity records
            // cap on empty queues since older records may arrive
            AsyncRecordQueue<T> nextQueue = heads.poll();
            T record;
            int emitCount = 0;
            while ((record = min.queue.poll()) != null) {
                emit(record, min);
                // track last record emitted, even when queue becomes empty
                min.headTimestamp = record.getTimestamp();
                // potentially switch to next queue
                if ((nextQueue != null && min.headTimestamp > nextQueue.headTimestamp)
                        || (min.headTimestamp > maxEmitTimestamp)
                        || (emitCount++ > queueCapacity && !emptyQueues.isEmpty())) {
                    break;
                }
            }
            if (record == null) {
                this.emptyQueues.put(min, true);
            } else if (nextQueue != null && nextQueue.headTimestamp > min.headTimestamp) {
                // if we stopped emitting due to reaching max timestamp,
                // the next queue may not be the new min
                heads.offer(nextQueue);
                nextQueue = min;
            } else {
                heads.offer(min);
            }
            min = nextQueue;
        }
    }