in storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java [113:160]
private void createStorageTopic() {
try {
mqAdminExt.start();
// check if the topic exists
TopicRouteData topicRouteData = null;
try {
topicRouteData = mqAdminExt.examineTopicRouteInfo(storageTopic);
} catch (MQClientException e) {
log.warn("maybe the storage topic {} not found, need to create", storageTopic);
} catch (Exception e) {
throw new SchemaException("Failed to create storage rocketmq topic", e);
}
if (topicRouteData != null && CollectionUtils.isNotEmpty(topicRouteData.getBrokerDatas())
&& CollectionUtils.isNotEmpty(topicRouteData.getQueueDatas())) {
log.info("the storage topic {} already exist, no need to create", storageTopic);
return;
}
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
for (BrokerData brokerData : brokerAddrTable.values()) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setTopicName(storageTopic);
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);
if (useCompactTopic) {
Map<String, String> attributes = new HashMap<>(1);
attributes.put("+delete.policy", "COMPACTION");
topicConfig.setAttributes(attributes);
}
String brokerAddr = brokerData.selectBrokerAddr();
mqAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig);
}
} catch (Exception e) {
throw new SchemaException("Failed to create storage rocketmq topic", e);
} finally {
mqAdminExt.shutdown();
}
} catch (MQClientException e) {
throw new SchemaException("Rocketmq admin tool start failed", e);
}
}