public static PulsarSourceReader create()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java [254:308]


    public static <OUT> PulsarSourceReader<OUT> create(
            SourceConfiguration sourceConfiguration,
            PulsarDeserializationSchema<OUT> deserializationSchema,
            PulsarCrypto pulsarCrypto,
            SourceReaderContext readerContext)
            throws Exception {

        // Create a message queue with the predefined source option.
        int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
        FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue =
                new FutureCompletingBlockingQueue<>(queueCapacity);

        PulsarClient pulsarClient = createClient(sourceConfiguration);
        PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);

        // Initialize the deserialization schema before creating the pulsar reader.
        PulsarDeserializationSchemaInitializationContext initializationContext =
                new PulsarDeserializationSchemaInitializationContext(readerContext, pulsarClient);
        deserializationSchema.open(initializationContext, sourceConfiguration);

        // Choose the right schema bytes to use.
        Schema<byte[]> schema;
        if (sourceConfiguration.isEnableSchemaEvolution()) {
            // Wrap the schema into a byte array schema with extra schema info check.
            PulsarSchema<?> pulsarSchema =
                    ((PulsarSchemaWrapper<?>) deserializationSchema).pulsarSchema();
            schema = new BytesSchema(pulsarSchema);
        } else {
            schema = Schema.BYTES;
        }

        // Create an ordered split reader supplier.
        Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier =
                () ->
                        new PulsarPartitionSplitReader(
                                pulsarClient,
                                pulsarAdmin,
                                sourceConfiguration,
                                schema,
                                pulsarCrypto,
                                readerContext.metricGroup());

        PulsarSourceFetcherManager fetcherManager =
                new PulsarSourceFetcherManager(
                        elementsQueue, splitReaderSupplier, readerContext.getConfiguration());

        return new PulsarSourceReader<>(
                elementsQueue,
                fetcherManager,
                deserializationSchema,
                sourceConfiguration,
                pulsarClient,
                pulsarAdmin,
                readerContext);
    }