in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java [157:196]
SourceReader<OUT, KafkaPartitionSplit> createReader(
SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
elementsQueue = new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
return readerContext.metricGroup().addGroup("deserializer");
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}
});
final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
new KafkaSourceReaderMetrics(readerContext.metricGroup());
Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
() ->
new KafkaPartitionSplitReader(
props,
readerContext,
kafkaSourceReaderMetrics,
Optional.ofNullable(rackIdSupplier)
.map(Supplier::get)
.orElse(null));
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
return new KafkaSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
elementsQueue, splitReaderSupplier::get, splitFinishedHook),
recordEmitter,
toConfiguration(props),
readerContext,
kafkaSourceReaderMetrics);
}