private void createStorageTopic()

in storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java [113:160]


    private void createStorageTopic() {

        try {
            mqAdminExt.start();

            // check if the topic exists
            TopicRouteData topicRouteData = null;
            try {
                topicRouteData = mqAdminExt.examineTopicRouteInfo(storageTopic);
            } catch (MQClientException e) {
                log.warn("maybe the storage topic {} not found, need to create", storageTopic);
            } catch (Exception e) {
                throw new SchemaException("Failed to create storage rocketmq topic", e);
            }

            if (topicRouteData != null && CollectionUtils.isNotEmpty(topicRouteData.getBrokerDatas())
                && CollectionUtils.isNotEmpty(topicRouteData.getQueueDatas())) {
                log.info("the storage topic {} already exist, no need to create", storageTopic);
                return;
            }

            try {
                ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
                HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
                for (BrokerData brokerData : brokerAddrTable.values()) {
                    TopicConfig topicConfig = new TopicConfig();
                    topicConfig.setTopicName(storageTopic);
                    topicConfig.setReadQueueNums(8);
                    topicConfig.setWriteQueueNums(8);
                    if (useCompactTopic) {
                        Map<String, String> attributes = new HashMap<>(1);
                        attributes.put("+delete.policy", "COMPACTION");
                        topicConfig.setAttributes(attributes);
                    }
                    String brokerAddr = brokerData.selectBrokerAddr();
                    mqAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig);
                }
            } catch (Exception e) {
                throw new SchemaException("Failed to create storage rocketmq topic", e);
            } finally {
                mqAdminExt.shutdown();
            }

        } catch (MQClientException e) {
            throw new SchemaException("Rocketmq admin tool start failed", e);
        }

    }