in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/KinesisDataFetcher.java [1169:1217]
public void onProcessingTime(long timestamp) {
if (nextWatermark != Long.MIN_VALUE) {
long globalWatermark = lastGlobalWatermark;
// TODO: refresh watermark while idle
if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
globalWatermark = watermarkTracker.updateWatermark(nextWatermark);
propagatedLocalWatermark = nextWatermark;
} else {
LOG.info("WatermarkSyncCallback subtask: {} is idle", indexOfThisConsumerSubtask);
}
if (timestamp - lastLogged > LOG_INTERVAL_MILLIS) {
lastLogged = System.currentTimeMillis();
LOG.info(
"WatermarkSyncCallback subtask: {} local watermark: {}"
+ ", global watermark: {}, delta: {} timeouts: {}, emitter: {}",
indexOfThisConsumerSubtask,
nextWatermark,
globalWatermark,
nextWatermark - globalWatermark,
watermarkTracker.getUpdateTimeoutCount(),
recordEmitter.printInfo());
// Following is for debugging non-reproducible issue with stalled watermark
if (globalWatermark == nextWatermark && globalWatermark == lastGlobalWatermark
&& stalledWatermarkIntervalCount++ > 5) {
// subtask blocks watermark, log to aid troubleshooting
stalledWatermarkIntervalCount = 0;
for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
RecordWrapper<T> nextRecord = q.peek();
if (nextRecord != null) {
LOG.info(
"stalled watermark {} key {} next watermark {} next timestamp {}",
nextWatermark,
e.getKey(),
nextRecord.watermark,
nextRecord.timestamp);
}
}
}
}
lastGlobalWatermark = globalWatermark;
recordEmitter.setCurrentWatermark(globalWatermark);
}
// schedule next callback
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
}