private KafkaSourceReader createReader()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java [414:469]


    private KafkaSourceReader<T> createReader(String kafkaClusterId) throws Exception {
        FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
                elementsQueue = new FutureCompletingBlockingQueue<>();
        Properties readerSpecificProperties = new Properties();
        KafkaPropertiesUtil.copyProperties(properties, readerSpecificProperties);
        KafkaPropertiesUtil.copyProperties(
                Preconditions.checkNotNull(
                        clustersProperties.get(kafkaClusterId),
                        "Properties for cluster %s is not found. Current Kafka cluster ids: %s",
                        kafkaClusterId,
                        clustersProperties.keySet()),
                readerSpecificProperties);
        KafkaPropertiesUtil.setClientIdPrefix(readerSpecificProperties, kafkaClusterId);

        // layer a kafka cluster group to distinguish metrics by cluster
        KafkaClusterMetricGroup kafkaClusterMetricGroup =
                new KafkaClusterMetricGroup(
                        dynamicKafkaSourceMetricGroup, readerContext.metricGroup(), kafkaClusterId);
        kafkaClusterMetricGroupManager.register(kafkaClusterId, kafkaClusterMetricGroup);
        KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
                new KafkaSourceReaderMetrics(kafkaClusterMetricGroup);

        deserializationSchema.open(
                new DeserializationSchema.InitializationContext() {
                    @Override
                    public MetricGroup getMetricGroup() {
                        // adding kafkaClusterMetricGroup instead of the sourceReaderMetricGroup
                        // since there could be metric collision, so `kafkaCluster` group is
                        // necessary to
                        // distinguish between instances of this metric
                        return kafkaClusterMetricGroup.addGroup("deserializer");
                    }

                    @Override
                    public UserCodeClassLoader getUserCodeClassLoader() {
                        return readerContext.getUserCodeClassLoader();
                    }
                });

        KafkaRecordEmitter<T> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
        return new KafkaSourceReader<>(
                elementsQueue,
                new KafkaSourceFetcherManager(
                        elementsQueue,
                        () ->
                                new KafkaPartitionSplitReaderWrapper(
                                        readerSpecificProperties,
                                        readerContext,
                                        kafkaSourceReaderMetrics,
                                        kafkaClusterId),
                        (ignore) -> {}),
                recordEmitter,
                toConfiguration(readerSpecificProperties),
                readerContext,
                kafkaSourceReaderMetrics);
    }