public void onProcessingTime()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/KinesisDataFetcher.java [1169:1217]


		public void onProcessingTime(long timestamp) {
			if (nextWatermark != Long.MIN_VALUE) {
				long globalWatermark = lastGlobalWatermark;
				// TODO: refresh watermark while idle
				if (!(isIdle && nextWatermark == propagatedLocalWatermark)) {
					globalWatermark = watermarkTracker.updateWatermark(nextWatermark);
					propagatedLocalWatermark = nextWatermark;
				} else {
					LOG.info("WatermarkSyncCallback subtask: {} is idle", indexOfThisConsumerSubtask);
				}

				if (timestamp - lastLogged > LOG_INTERVAL_MILLIS) {
					lastLogged = System.currentTimeMillis();
					LOG.info(
							"WatermarkSyncCallback subtask: {} local watermark: {}"
									+ ", global watermark: {}, delta: {} timeouts: {}, emitter: {}",
							indexOfThisConsumerSubtask,
							nextWatermark,
							globalWatermark,
							nextWatermark - globalWatermark,
							watermarkTracker.getUpdateTimeoutCount(),
							recordEmitter.printInfo());

					// Following is for debugging non-reproducible issue with stalled watermark
					if (globalWatermark == nextWatermark && globalWatermark == lastGlobalWatermark
							&& stalledWatermarkIntervalCount++ > 5) {
						// subtask blocks watermark, log to aid troubleshooting
						stalledWatermarkIntervalCount = 0;
						for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
							RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
							RecordWrapper<T> nextRecord = q.peek();
							if (nextRecord != null) {
								LOG.info(
										"stalled watermark {} key {} next watermark {} next timestamp {}",
										nextWatermark,
										e.getKey(),
										nextRecord.watermark,
										nextRecord.timestamp);
							}
						}
					}
				}

				lastGlobalWatermark = globalWatermark;
				recordEmitter.setCurrentWatermark(globalWatermark);
			}
			// schedule next callback
			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
		}