in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java [414:469]
private KafkaSourceReader<T> createReader(String kafkaClusterId) throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
elementsQueue = new FutureCompletingBlockingQueue<>();
Properties readerSpecificProperties = new Properties();
KafkaPropertiesUtil.copyProperties(properties, readerSpecificProperties);
KafkaPropertiesUtil.copyProperties(
Preconditions.checkNotNull(
clustersProperties.get(kafkaClusterId),
"Properties for cluster %s is not found. Current Kafka cluster ids: %s",
kafkaClusterId,
clustersProperties.keySet()),
readerSpecificProperties);
KafkaPropertiesUtil.setClientIdPrefix(readerSpecificProperties, kafkaClusterId);
// layer a kafka cluster group to distinguish metrics by cluster
KafkaClusterMetricGroup kafkaClusterMetricGroup =
new KafkaClusterMetricGroup(
dynamicKafkaSourceMetricGroup, readerContext.metricGroup(), kafkaClusterId);
kafkaClusterMetricGroupManager.register(kafkaClusterId, kafkaClusterMetricGroup);
KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
new KafkaSourceReaderMetrics(kafkaClusterMetricGroup);
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
// adding kafkaClusterMetricGroup instead of the sourceReaderMetricGroup
// since there could be metric collision, so `kafkaCluster` group is
// necessary to
// distinguish between instances of this metric
return kafkaClusterMetricGroup.addGroup("deserializer");
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}
});
KafkaRecordEmitter<T> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
return new KafkaSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
elementsQueue,
() ->
new KafkaPartitionSplitReaderWrapper(
readerSpecificProperties,
readerContext,
kafkaSourceReaderMetrics,
kafkaClusterId),
(ignore) -> {}),
recordEmitter,
toConfiguration(readerSpecificProperties),
readerContext,
kafkaSourceReaderMetrics);
}