in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [1269:1323]
public void onProcessingTime(long timestamp) {
if (nextWatermark != Long.MIN_VALUE) {
long globalWatermark = lastGlobalWatermark;
if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
globalWatermark = watermarkTracker.updateWatermark(nextWatermark);
propagatedLocalWatermark = nextWatermark;
} else {
globalWatermark = watermarkTracker.updateWatermark(Long.MIN_VALUE);
LOG.info(
"WatermarkSyncCallback subtask: {} is idle",
indexOfThisConsumerSubtask);
}
if (timestamp - lastLogged > LOG_INTERVAL_MILLIS) {
lastLogged = System.currentTimeMillis();
LOG.info(
"WatermarkSyncCallback subtask: {} local watermark: {}"
+ ", global watermark: {}, delta: {} timeouts: {}, idle: {}"
+ ", emitter: {}",
indexOfThisConsumerSubtask,
nextWatermark,
globalWatermark,
nextWatermark - globalWatermark,
watermarkTracker.getUpdateTimeoutCount(),
isIdle,
recordEmitter.printInfo());
// Following is for debugging non-reproducible issue with stalled watermark
if (globalWatermark == nextWatermark
&& globalWatermark == lastGlobalWatermark
&& stalledWatermarkIntervalCount++ > 5) {
// subtask blocks watermark, log to aid troubleshooting
stalledWatermarkIntervalCount = 0;
for (Map.Entry<Integer, ShardWatermarkState> e :
shardWatermarks.entrySet()) {
RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
RecordWrapper<T> nextRecord = q.peek();
if (nextRecord != null) {
LOG.info(
"stalled watermark {} key {} next watermark {} next timestamp {}",
nextWatermark,
e.getKey(),
nextRecord.watermark,
nextRecord.timestamp);
}
}
}
}
lastGlobalWatermark = globalWatermark;
recordEmitter.setCurrentWatermark(globalWatermark);
}
// schedule next callback
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
}