in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [1143:1203]
protected void emitWatermark() {
LOG.debug(
"Evaluating watermark for subtask {} time {}",
indexOfThisConsumerSubtask,
getCurrentTimeMillis());
long potentialWatermark = Long.MAX_VALUE;
long potentialNextWatermark = Long.MAX_VALUE;
long idleTime =
(shardIdleIntervalMillis > 0)
? getCurrentTimeMillis() - shardIdleIntervalMillis
: Long.MAX_VALUE;
for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
Watermark w = e.getValue().lastEmittedRecordWatermark;
// consider only active shards, or those that would advance the watermark
if (w != null
&& (e.getValue().lastUpdated >= idleTime
|| e.getValue().emitQueue.getSize() > 0
|| w.getTimestamp() > lastWatermark)) {
potentialWatermark = Math.min(potentialWatermark, w.getTimestamp());
// for sync, use the watermark of the next record, when available
// otherwise watermark may stall when record is blocked by synchronization
RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
RecordWrapper<T> nextRecord = q.peek();
Watermark nextWatermark = (nextRecord != null) ? nextRecord.watermark : w;
potentialNextWatermark =
Math.min(potentialNextWatermark, nextWatermark.getTimestamp());
}
}
LOG.debug(
"WatermarkEmitter subtask: {}, last watermark: {}, potential watermark: {}"
+ ", potential next watermark: {}",
indexOfThisConsumerSubtask,
lastWatermark,
potentialWatermark,
potentialNextWatermark);
// advance watermark if possible (watermarks can only be ascending)
if (potentialWatermark == Long.MAX_VALUE) {
if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
LOG.info(
"No active shard for subtask {}, marking the source idle.",
indexOfThisConsumerSubtask);
// no active shard, signal downstream operators to not wait for a watermark
sourceContext.markAsTemporarilyIdle();
isIdle = true;
}
} else {
if (potentialWatermark > lastWatermark) {
LOG.debug(
"Emitting watermark {} from subtask {}",
potentialWatermark,
indexOfThisConsumerSubtask);
sourceContext.emitWatermark(new Watermark(potentialWatermark));
lastWatermark = potentialWatermark;
isIdle = false;
}
nextWatermark = potentialNextWatermark;
}
}