public SourceReader createReader()

in src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java [105:151]


    public SourceReader<OUT, RocketMQSourceSplit> createReader(SourceReaderContext readerContext)
            throws Exception {

        FutureCompletingBlockingQueue<RecordsWithSplitIds<MessageView>> elementsQueue =
                new FutureCompletingBlockingQueue<>();

        deserializationSchema.open(
                new DeserializationSchema.InitializationContext() {
                    @Override
                    public MetricGroup getMetricGroup() {
                        return readerContext.metricGroup().addGroup("deserializer");
                    }

                    @Override
                    public UserCodeClassLoader getUserCodeClassLoader() {
                        return readerContext.getUserCodeClassLoader();
                    }
                });

        final RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics =
                new RocketMQSourceReaderMetrics(readerContext.metricGroup());

        // unique reader
        RocketMQSplitReader<OUT> reader =
                new RocketMQSplitReader<>(
                        configuration,
                        readerContext,
                        deserializationSchema,
                        rocketMQSourceReaderMetrics);
        Supplier<SplitReader<MessageView, RocketMQSourceSplit>> splitReaderSupplier = () -> reader;

        RocketMQSourceFetcherManager rocketmqSourceFetcherManager =
                new RocketMQSourceFetcherManager(
                        elementsQueue, splitReaderSupplier, (ignore) -> {});

        RocketMQRecordEmitter<OUT> recordEmitter =
                new RocketMQRecordEmitter<>(deserializationSchema);

        return new RocketMQSourceReader<>(
                elementsQueue,
                rocketmqSourceFetcherManager,
                recordEmitter,
                configuration,
                readerContext,
                rocketMQSourceReaderMetrics,
                splitReaderSupplier);
    }