protected void emitWatermark()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/internals/KinesisDataFetcher.java [1052:1099]


	protected void emitWatermark() {
		LOG.debug("Evaluating watermark for subtask {} time {}", indexOfThisConsumerSubtask, getCurrentTimeMillis());
		long potentialWatermark = Long.MAX_VALUE;
		long potentialNextWatermark = Long.MAX_VALUE;
		long idleTime =
				(shardIdleIntervalMillis > 0)
						? getCurrentTimeMillis() - shardIdleIntervalMillis
						: Long.MAX_VALUE;

		for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
			Watermark w = e.getValue().lastEmittedRecordWatermark;
			// consider only active shards, or those that would advance the watermark
			if (w != null && (e.getValue().lastUpdated >= idleTime
					|| e.getValue().emitQueue.getSize() > 0
					|| w.getTimestamp() > lastWatermark)) {
				potentialWatermark = Math.min(potentialWatermark, w.getTimestamp());
				// for sync, use the watermark of the next record, when available
				// otherwise watermark may stall when record is blocked by synchronization
				RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
				RecordWrapper<T> nextRecord = q.peek();
				Watermark nextWatermark = (nextRecord != null) ? nextRecord.watermark : w;
				potentialNextWatermark = Math.min(potentialNextWatermark, nextWatermark.getTimestamp());
			}
		}

		// advance watermark if possible (watermarks can only be ascending)
		if (potentialWatermark == Long.MAX_VALUE) {
			if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
				LOG.info(
						"No active shard for subtask {}, marking the source idle.",
						indexOfThisConsumerSubtask);
				// no active shard, signal downstream operators to not wait for a watermark
				sourceContext.markAsTemporarilyIdle();
				isIdle = true;
			}
		} else {
			if (potentialWatermark > lastWatermark) {
				LOG.debug(
						"Emitting watermark {} from subtask {}",
						potentialWatermark,
						indexOfThisConsumerSubtask);
				sourceContext.emitWatermark(new Watermark(potentialWatermark));
				lastWatermark = potentialWatermark;
				isIdle = false;
			}
			nextWatermark = potentialNextWatermark;
		}
	}