in core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java [217:237]
private static boolean check(DefaultMQAdminExt mqAdmin, String topicName) {
if (existTopic.contains(topicName)) {
return true;
}
try {
mqAdmin.examineTopicRouteInfo(topicName);
existTopic.add(topicName);
return true;
} catch (RemotingException | InterruptedException e) {
logger.error("examine topic route info error.", e);
throw new RuntimeException("examine topic route info error.", e);
} catch (MQClientException exception) {
if (exception.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
logger.info("topic[{}] does not exist, create it.", topicName);
} else {
throw new RuntimeException(exception);
}
}
return false;
}