in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/KinesisDataFetcher.java [1052:1099]
protected void emitWatermark() {
LOG.debug("Evaluating watermark for subtask {} time {}", indexOfThisConsumerSubtask, getCurrentTimeMillis());
long potentialWatermark = Long.MAX_VALUE;
long potentialNextWatermark = Long.MAX_VALUE;
long idleTime =
(shardIdleIntervalMillis > 0)
? getCurrentTimeMillis() - shardIdleIntervalMillis
: Long.MAX_VALUE;
for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
Watermark w = e.getValue().lastEmittedRecordWatermark;
// consider only active shards, or those that would advance the watermark
if (w != null && (e.getValue().lastUpdated >= idleTime
|| e.getValue().emitQueue.getSize() > 0
|| w.getTimestamp() > lastWatermark)) {
potentialWatermark = Math.min(potentialWatermark, w.getTimestamp());
// for sync, use the watermark of the next record, when available
// otherwise watermark may stall when record is blocked by synchronization
RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
RecordWrapper<T> nextRecord = q.peek();
Watermark nextWatermark = (nextRecord != null) ? nextRecord.watermark : w;
potentialNextWatermark = Math.min(potentialNextWatermark, nextWatermark.getTimestamp());
}
}
// advance watermark if possible (watermarks can only be ascending)
if (potentialWatermark == Long.MAX_VALUE) {
if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
LOG.info(
"No active shard for subtask {}, marking the source idle.",
indexOfThisConsumerSubtask);
// no active shard, signal downstream operators to not wait for a watermark
sourceContext.markAsTemporarilyIdle();
isIdle = true;
}
} else {
if (potentialWatermark > lastWatermark) {
LOG.debug(
"Emitting watermark {} from subtask {}",
potentialWatermark,
indexOfThisConsumerSubtask);
sourceContext.emitWatermark(new Watermark(potentialWatermark));
lastWatermark = potentialWatermark;
isIdle = false;
}
nextWatermark = potentialNextWatermark;
}
}