in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java [356:394]
private KafkaSourceEnumerator createEnumeratorWithAssignedTopicPartitions(
String kafkaClusterId,
Set<String> topics,
KafkaSourceEnumState kafkaSourceEnumState,
Properties fetchedProperties) {
final Runnable signalNoMoreSplitsCallback;
if (Boundedness.BOUNDED.equals(boundedness)) {
signalNoMoreSplitsCallback = this::handleNoMoreSplits;
} else {
signalNoMoreSplitsCallback = null;
}
StoppableKafkaEnumContextProxy context =
stoppableKafkaEnumContextProxyFactory.create(
enumContext,
kafkaClusterId,
kafkaMetadataService,
signalNoMoreSplitsCallback);
Properties consumerProps = new Properties();
KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);
KafkaPropertiesUtil.copyProperties(properties, consumerProps);
KafkaPropertiesUtil.setClientIdPrefix(consumerProps, kafkaClusterId);
KafkaSourceEnumerator enumerator =
new KafkaSourceEnumerator(
KafkaSubscriber.getTopicListSubscriber(new ArrayList<>(topics)),
startingOffsetsInitializer,
stoppingOffsetInitializer,
consumerProps,
context,
boundedness,
kafkaSourceEnumState);
clusterEnumContextMap.put(kafkaClusterId, context);
clusterEnumeratorMap.put(kafkaClusterId, enumerator);
return enumerator;
}