protected void emitWatermark()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [1143:1203]


    protected void emitWatermark() {
        LOG.debug(
                "Evaluating watermark for subtask {} time {}",
                indexOfThisConsumerSubtask,
                getCurrentTimeMillis());
        long potentialWatermark = Long.MAX_VALUE;
        long potentialNextWatermark = Long.MAX_VALUE;
        long idleTime =
                (shardIdleIntervalMillis > 0)
                        ? getCurrentTimeMillis() - shardIdleIntervalMillis
                        : Long.MAX_VALUE;

        for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) {
            Watermark w = e.getValue().lastEmittedRecordWatermark;
            // consider only active shards, or those that would advance the watermark
            if (w != null
                    && (e.getValue().lastUpdated >= idleTime
                            || e.getValue().emitQueue.getSize() > 0
                            || w.getTimestamp() > lastWatermark)) {
                potentialWatermark = Math.min(potentialWatermark, w.getTimestamp());
                // for sync, use the watermark of the next record, when available
                // otherwise watermark may stall when record is blocked by synchronization
                RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
                RecordWrapper<T> nextRecord = q.peek();
                Watermark nextWatermark = (nextRecord != null) ? nextRecord.watermark : w;
                potentialNextWatermark =
                        Math.min(potentialNextWatermark, nextWatermark.getTimestamp());
            }
        }

        LOG.debug(
                "WatermarkEmitter subtask: {}, last watermark: {}, potential watermark: {}"
                        + ", potential next watermark: {}",
                indexOfThisConsumerSubtask,
                lastWatermark,
                potentialWatermark,
                potentialNextWatermark);

        // advance watermark if possible (watermarks can only be ascending)
        if (potentialWatermark == Long.MAX_VALUE) {
            if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) {
                LOG.info(
                        "No active shard for subtask {}, marking the source idle.",
                        indexOfThisConsumerSubtask);
                // no active shard, signal downstream operators to not wait for a watermark
                sourceContext.markAsTemporarilyIdle();
                isIdle = true;
            }
        } else {
            if (potentialWatermark > lastWatermark) {
                LOG.debug(
                        "Emitting watermark {} from subtask {}",
                        potentialWatermark,
                        indexOfThisConsumerSubtask);
                sourceContext.emitWatermark(new Watermark(potentialWatermark));
                lastWatermark = potentialWatermark;
                isIdle = false;
            }
            nextWatermark = potentialNextWatermark;
        }
    }