in src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java [262:297]
private Set<MessageQueue> requestServiceDiscovery() {
// Ensure all subtasks have been initialized
try {
if (initTask != null) {
for (int i = 0; i < context.currentParallelism(); i++) {
if (!initTask[i]) {
context.sendEventToSourceReader(i, new SourceDetectEvent());
}
}
}
} catch (Exception e) {
log.error("init request resend error, please check task has started");
return null;
}
Set<String> topicSet =
Sets.newHashSet(
configuration
.getString(RocketMQSourceOptions.TOPIC)
.split(RocketMQSourceOptions.TOPIC_SEPARATOR));
return topicSet.stream()
.flatMap(
topic -> {
try {
return consumer.fetchMessageQueues(topic).get().stream();
} catch (Exception e) {
log.error(
"Request topic route for service discovery error, topic={}",
topic,
e);
}
return Stream.empty();
})
.collect(Collectors.toSet());
}