SourceReader createReader()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java [157:196]


    SourceReader<OUT, KafkaPartitionSplit> createReader(
            SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
            throws Exception {
        FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
                elementsQueue = new FutureCompletingBlockingQueue<>();
        deserializationSchema.open(
                new DeserializationSchema.InitializationContext() {
                    @Override
                    public MetricGroup getMetricGroup() {
                        return readerContext.metricGroup().addGroup("deserializer");
                    }

                    @Override
                    public UserCodeClassLoader getUserCodeClassLoader() {
                        return readerContext.getUserCodeClassLoader();
                    }
                });
        final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
                new KafkaSourceReaderMetrics(readerContext.metricGroup());

        Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
                () ->
                        new KafkaPartitionSplitReader(
                                props,
                                readerContext,
                                kafkaSourceReaderMetrics,
                                Optional.ofNullable(rackIdSupplier)
                                        .map(Supplier::get)
                                        .orElse(null));
        KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);

        return new KafkaSourceReader<>(
                elementsQueue,
                new KafkaSourceFetcherManager(
                        elementsQueue, splitReaderSupplier::get, splitFinishedHook),
                recordEmitter,
                toConfiguration(props),
                readerContext,
                kafkaSourceReaderMetrics);
    }