private Set requestServiceDiscovery()

in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [262:297]


    private Set<MessageQueue> requestServiceDiscovery() {
        // Ensure all subtasks have been initialized
        try {
            if (initTask != null) {
                for (int i = 0; i < context.currentParallelism(); i++) {
                    if (!initTask[i]) {
                        context.sendEventToSourceReader(i, new SourceDetectEvent());
                    }
                }
            }
        } catch (Exception e) {
            log.error("init request resend error, please check task has started");
            return null;
        }

        Set<String> topicSet =
                Sets.newHashSet(
                        configuration
                                .getString(RocketMQSourceOptions.TOPIC)
                                .split(RocketMQSourceOptions.TOPIC_SEPARATOR));

        return topicSet.stream()
                .flatMap(
                        topic -> {
                            try {
                                return consumer.fetchMessageQueues(topic).get().stream();
                            } catch (Exception e) {
                                log.error(
                                        "Request topic route for service discovery error, topic={}",
                                        topic,
                                        e);
                            }
                            return Stream.empty();
                        })
                .collect(Collectors.toSet());
    }