in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java [139:171]
SourceReader<OUT, KafkaPartitionSplit> createReader(
SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
elementsQueue = new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
return readerContext.metricGroup().addGroup("deserializer");
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}
});
final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
new KafkaSourceReaderMetrics(readerContext.metricGroup());
Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
() -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
return new KafkaSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
elementsQueue, splitReaderSupplier::get, splitFinishedHook),
recordEmitter,
toConfiguration(props),
readerContext,
kafkaSourceReaderMetrics);
}