protected AbstractFetcher()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java [142:214]


    protected AbstractFetcher(
            SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,
            ClassLoader userCodeClassLoader,
            MetricGroup consumerMetricGroup,
            boolean useMetrics)
            throws Exception {
        this.sourceContext = checkNotNull(sourceContext);
        this.watermarkOutput = new SourceContextWatermarkOutputAdapter<>(sourceContext);
        this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.userCodeClassLoader = checkNotNull(userCodeClassLoader);

        this.useMetrics = useMetrics;
        this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
        this.legacyCurrentOffsetsMetricGroup =
                consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
        this.legacyCommittedOffsetsMetricGroup =
                consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);

        this.watermarkStrategy = watermarkStrategy;

        if (watermarkStrategy == null) {
            timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
        } else {
            timestampWatermarkMode = WITH_WATERMARK_GENERATOR;
        }

        this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();

        // initialize subscribed partition states with seed partitions
        this.subscribedPartitionStates =
                createPartitionStateHolders(
                        seedPartitionsWithInitialOffsets,
                        timestampWatermarkMode,
                        watermarkStrategy,
                        userCodeClassLoader);

        // check that all seed partition states have a defined offset
        for (KafkaTopicPartitionState<?, ?> partitionState : subscribedPartitionStates) {
            if (!partitionState.isOffsetDefined()) {
                throw new IllegalArgumentException(
                        "The fetcher was assigned seed partitions with undefined initial offsets.");
            }
        }

        // all seed partitions are not assigned yet, so should be added to the unassigned partitions
        // queue
        for (KafkaTopicPartitionState<T, KPH> partition : subscribedPartitionStates) {
            unassignedPartitionsQueue.add(partition);
        }

        // register metrics for the initial seed partitions
        if (useMetrics) {
            registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
        }

        // if we have periodic watermarks, kick off the interval scheduler
        if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
            PeriodicWatermarkEmitter<T, KPH> periodicEmitter =
                    new PeriodicWatermarkEmitter<>(
                            checkpointLock,
                            subscribedPartitionStates,
                            watermarkOutputMultiplexer,
                            processingTimeProvider,
                            autoWatermarkInterval);

            periodicEmitter.start();
        }
    }