golang/utils/MQAdminUtils.go (122 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package utils import ( "fmt" "log" "os/exec" "path" "runtime" ) var root string func init() { _, filename, _, ok := runtime.Caller(0) // get whole project base path root = path.Dir(path.Dir(path.Dir(filename))) if !ok { log.Fatal("get project root path failed") } } func CreateTopic(topicName string, brokerAddr string, clusterName string, nameserver string) { // use absolute path command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName if nameserver != "" { command += " -n " + nameserver } if brokerAddr != "" { command += " -b " + brokerAddr } if clusterName != "" { command += " -c " + clusterName } fmt.Println(command) out, err := exec.Command("/bin/bash", "-c", command).Output() if err != nil { log.Fatal(err) } fmt.Println(string(out)) } func CreateDelayTopic(topicName string, brokerAddr string, clusterName string, nameserver string) { // use absolute path command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName if nameserver != "" { command += " -n " + nameserver } if brokerAddr != "" { command += " -b " + brokerAddr } if clusterName != "" { command += " -c " + clusterName } command += " -a " + "+message.type=DELAY" fmt.Println(command) out, err := exec.Command("/bin/bash", "-c", command).Output() if err != nil { log.Fatal(err) } fmt.Println(string(out)) } func CreateFIFOTopic(topicName string, brokerAddr string, clusterName string, nameserver string) { // use absolute path command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName if nameserver != "" { command += " -n " + nameserver } if brokerAddr != "" { command += " -b " + brokerAddr } if clusterName != "" { command += " -c " + clusterName } command += " -a " + "+message.type=FIFO" fmt.Println(command) out, err := exec.Command("/bin/bash", "-c", command).Output() if err != nil { log.Fatal(err) } fmt.Println(string(out)) } func CreateTransactionTopic(topicName string, brokerAddr string, clusterName string, nameserver string) { // use absolute path command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName if nameserver != "" { command += " -n " + nameserver } if brokerAddr != "" { command += " -b " + brokerAddr } if clusterName != "" { command += " -c " + clusterName } command += " -a " + "+message.type=TRANSACTION" fmt.Println(command) out, err := exec.Command("/bin/bash", "-c", command).Output() if err != nil { log.Fatal(err) } fmt.Println(string(out)) } func CreateOrderlyConsumerGroup(consumerGroup string, brokerAddr string, clusterName string, nameserver string) { // use absolute path command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateSubGroup -g " + consumerGroup if nameserver != "" { command += " -n " + nameserver } if brokerAddr != "" { command += " -b " + brokerAddr } if clusterName != "" { command += " -c " + clusterName } command += " -s true -o true -m false -d false " fmt.Println(command) out, err := exec.Command("/bin/bash", "-c", command).Output() if err != nil { log.Fatal(err) } fmt.Println(string(out)) } func ClusterList(nameserver string) string { command := "sh " + root + "/rocketmq-admintools/bin/mqadmin clusterlist" if nameserver != "" { command += " -n " + nameserver } fmt.Println(command) out, err := exec.Command("/bin/bash", "-c", command).Output() if err != nil { log.Fatal(err) } return string(out) }