protected void emitRecords()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/util/RecordEmitter.java [182:266]


	protected void emitRecords() {
		// emit available records, ordered by timestamp
		AsyncRecordQueue<T> min = heads.poll();
		runLoop:
		while (running) {
			// find a queue to emit from
			while (min == null) {
				// check new or previously empty queues
				if (!emptyQueues.isEmpty()) {
					for (AsyncRecordQueue<T> queueHead : emptyQueues.keySet()) {
						if (!queueHead.queue.isEmpty()) {
							emptyQueues.remove(queueHead);
							queueHead.headTimestamp = queueHead.queue.peek().getTimestamp();
							heads.offer(queueHead);
						}
					}
				}
				min = heads.poll();
				if (min == null) {
					synchronized (condition) {
						// wait for work
						try {
							condition.wait(idleSleepMillis);
						} catch (InterruptedException e) {
							continue runLoop;
						}
					}
				}
				if (!running) {
					// Make sure we can exit this loop so the thread can shut down
					break runLoop;
				}
			}

			// wait until ready to emit min or another queue receives elements
			while (min.headTimestamp > maxEmitTimestamp) {
				synchronized (condition) {
					// wait until ready to emit
					try {
						condition.wait(idleSleepMillis);
					} catch (InterruptedException e) {
						continue runLoop;
					}
					if (min.headTimestamp > maxEmitTimestamp && !emptyQueues.isEmpty()) {
						// see if another queue can make progress
						heads.offer(min);
						min = null;
						continue runLoop;
					}
				}
				if (!running) {
					// Make sure we can exit this loop so the thread can shut down
					break runLoop;
				}
			}

			// emit up to queue capacity records
			// cap on empty queues since older records may arrive
			AsyncRecordQueue<T> nextQueue = heads.poll();
			T record;
			int emitCount = 0;
			while ((record = min.queue.poll()) != null) {
				emit(record, min);
				// track last record emitted, even when queue becomes empty
				min.headTimestamp = record.getTimestamp();
				// potentially switch to next queue
				if ((nextQueue != null && min.headTimestamp > nextQueue.headTimestamp)
					|| (min.headTimestamp > maxEmitTimestamp)
					|| (emitCount++ > queueCapacity && !emptyQueues.isEmpty())) {
					break;
				}
			}
			if (record == null) {
				this.emptyQueues.put(min, true);
			} else if (nextQueue != null && nextQueue.headTimestamp > min.headTimestamp) {
				// if we stopped emitting due to reaching max timestamp,
				// the next queue may not be the new min
				heads.offer(nextQueue);
				nextQueue = min;
			} else {
				heads.offer(min);
			}
			min = nextQueue;
		}
	}