in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java [254:308]
public static <OUT> PulsarSourceReader<OUT> create(
SourceConfiguration sourceConfiguration,
PulsarDeserializationSchema<OUT> deserializationSchema,
PulsarCrypto pulsarCrypto,
SourceReaderContext readerContext)
throws Exception {
// Create a message queue with the predefined source option.
int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue =
new FutureCompletingBlockingQueue<>(queueCapacity);
PulsarClient pulsarClient = createClient(sourceConfiguration);
PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
// Initialize the deserialization schema before creating the pulsar reader.
PulsarDeserializationSchemaInitializationContext initializationContext =
new PulsarDeserializationSchemaInitializationContext(readerContext, pulsarClient);
deserializationSchema.open(initializationContext, sourceConfiguration);
// Choose the right schema bytes to use.
Schema<byte[]> schema;
if (sourceConfiguration.isEnableSchemaEvolution()) {
// Wrap the schema into a byte array schema with extra schema info check.
PulsarSchema<?> pulsarSchema =
((PulsarSchemaWrapper<?>) deserializationSchema).pulsarSchema();
schema = new BytesSchema(pulsarSchema);
} else {
schema = Schema.BYTES;
}
// Create an ordered split reader supplier.
Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier =
() ->
new PulsarPartitionSplitReader(
pulsarClient,
pulsarAdmin,
sourceConfiguration,
schema,
pulsarCrypto,
readerContext.metricGroup());
PulsarSourceFetcherManager fetcherManager =
new PulsarSourceFetcherManager(
elementsQueue, splitReaderSupplier, readerContext.getConfiguration());
return new PulsarSourceReader<>(
elementsQueue,
fetcherManager,
deserializationSchema,
sourceConfiguration,
pulsarClient,
pulsarAdmin,
readerContext);
}