in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java [396:422]
private void startAllEnumerators() {
for (String kafkaClusterId : latestClusterTopicsMap.keySet()) {
try {
// starts enumerators and handles split discovery and assignment
clusterEnumeratorMap.get(kafkaClusterId).start();
} catch (KafkaException e) {
if (kafkaMetadataService.isClusterActive(kafkaClusterId)) {
throw new RuntimeException(
String.format("Failed to create enumerator for %s", kafkaClusterId), e);
} else {
logger.info(
"Found inactive cluster {} while initializing, removing enumerator",
kafkaClusterId,
e);
try {
clusterEnumContextMap.remove(kafkaClusterId).close();
clusterEnumeratorMap.remove(kafkaClusterId).close();
} catch (Exception ex) {
// closing enumerator throws an exception, let error propagate and restart
// the job
throw new RuntimeException(
"Failed to close enum context for " + kafkaClusterId, ex);
}
}
}
}
}