public void onProcessingTime()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [1269:1323]


        public void onProcessingTime(long timestamp) {
            if (nextWatermark != Long.MIN_VALUE) {
                long globalWatermark = lastGlobalWatermark;
                if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
                    globalWatermark = watermarkTracker.updateWatermark(nextWatermark);
                    propagatedLocalWatermark = nextWatermark;
                } else {
                    globalWatermark = watermarkTracker.updateWatermark(Long.MIN_VALUE);
                    LOG.info(
                            "WatermarkSyncCallback subtask: {} is idle",
                            indexOfThisConsumerSubtask);
                }

                if (timestamp - lastLogged > LOG_INTERVAL_MILLIS) {
                    lastLogged = System.currentTimeMillis();
                    LOG.info(
                            "WatermarkSyncCallback subtask: {} local watermark: {}"
                                    + ", global watermark: {}, delta: {} timeouts: {}, idle: {}"
                                    + ", emitter: {}",
                            indexOfThisConsumerSubtask,
                            nextWatermark,
                            globalWatermark,
                            nextWatermark - globalWatermark,
                            watermarkTracker.getUpdateTimeoutCount(),
                            isIdle,
                            recordEmitter.printInfo());

                    // Following is for debugging non-reproducible issue with stalled watermark
                    if (globalWatermark == nextWatermark
                            && globalWatermark == lastGlobalWatermark
                            && stalledWatermarkIntervalCount++ > 5) {
                        // subtask blocks watermark, log to aid troubleshooting
                        stalledWatermarkIntervalCount = 0;
                        for (Map.Entry<Integer, ShardWatermarkState> e :
                                shardWatermarks.entrySet()) {
                            RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
                            RecordWrapper<T> nextRecord = q.peek();
                            if (nextRecord != null) {
                                LOG.info(
                                        "stalled watermark {} key {} next watermark {} next timestamp {}",
                                        nextWatermark,
                                        e.getKey(),
                                        nextRecord.watermark,
                                        nextRecord.timestamp);
                            }
                        }
                    }
                }

                lastGlobalWatermark = globalWatermark;
                recordEmitter.setCurrentWatermark(globalWatermark);
            }
            // schedule next callback
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
        }