private void startAllEnumerators()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java [396:422]


    private void startAllEnumerators() {
        for (String kafkaClusterId : latestClusterTopicsMap.keySet()) {
            try {
                // starts enumerators and handles split discovery and assignment
                clusterEnumeratorMap.get(kafkaClusterId).start();
            } catch (KafkaException e) {
                if (kafkaMetadataService.isClusterActive(kafkaClusterId)) {
                    throw new RuntimeException(
                            String.format("Failed to create enumerator for %s", kafkaClusterId), e);
                } else {
                    logger.info(
                            "Found inactive cluster {} while initializing, removing enumerator",
                            kafkaClusterId,
                            e);
                    try {
                        clusterEnumContextMap.remove(kafkaClusterId).close();
                        clusterEnumeratorMap.remove(kafkaClusterId).close();
                    } catch (Exception ex) {
                        // closing enumerator throws an exception, let error propagate and restart
                        // the job
                        throw new RuntimeException(
                                "Failed to close enum context for " + kafkaClusterId, ex);
                    }
                }
            }
        }
    }