in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java [182:266]
protected void emitRecords() {
// emit available records, ordered by timestamp
AsyncRecordQueue<T> min = heads.poll();
runLoop:
while (running) {
// find a queue to emit from
while (min == null) {
// check new or previously empty queues
if (!emptyQueues.isEmpty()) {
for (AsyncRecordQueue<T> queueHead : emptyQueues.keySet()) {
if (!queueHead.queue.isEmpty()) {
emptyQueues.remove(queueHead);
queueHead.headTimestamp = queueHead.queue.peek().getTimestamp();
heads.offer(queueHead);
}
}
}
min = heads.poll();
if (min == null) {
synchronized (condition) {
// wait for work
try {
condition.wait(idleSleepMillis);
} catch (InterruptedException e) {
continue runLoop;
}
}
}
if (!running) {
// Make sure we can exit this loop so the thread can shut down
break runLoop;
}
}
// wait until ready to emit min or another queue receives elements
while (min.headTimestamp > maxEmitTimestamp) {
synchronized (condition) {
// wait until ready to emit
try {
condition.wait(idleSleepMillis);
} catch (InterruptedException e) {
continue runLoop;
}
if (min.headTimestamp > maxEmitTimestamp && !emptyQueues.isEmpty()) {
// see if another queue can make progress
heads.offer(min);
min = null;
continue runLoop;
}
}
if (!running) {
// Make sure we can exit this loop so the thread can shut down
break runLoop;
}
}
// emit up to queue capacity records
// cap on empty queues since older records may arrive
AsyncRecordQueue<T> nextQueue = heads.poll();
T record;
int emitCount = 0;
while ((record = min.queue.poll()) != null) {
emit(record, min);
// track last record emitted, even when queue becomes empty
min.headTimestamp = record.getTimestamp();
// potentially switch to next queue
if ((nextQueue != null && min.headTimestamp > nextQueue.headTimestamp)
|| (min.headTimestamp > maxEmitTimestamp)
|| (emitCount++ > queueCapacity && !emptyQueues.isEmpty())) {
break;
}
}
if (record == null) {
this.emptyQueues.put(min, true);
} else if (nextQueue != null && nextQueue.headTimestamp > min.headTimestamp) {
// if we stopped emitting due to reaching max timestamp,
// the next queue may not be the new min
heads.offer(nextQueue);
nextQueue = min;
} else {
heads.offer(min);
}
min = nextQueue;
}
}