golang/utils/MQAdminUtils.go (122 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"fmt"
"log"
"os/exec"
"path"
"runtime"
)
var root string
func init() {
_, filename, _, ok := runtime.Caller(0)
// get whole project base path
root = path.Dir(path.Dir(path.Dir(filename)))
if !ok {
log.Fatal("get project root path failed")
}
}
func CreateTopic(topicName string, brokerAddr string, clusterName string, nameserver string) {
// use absolute path
command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName
if nameserver != "" {
command += " -n " + nameserver
}
if brokerAddr != "" {
command += " -b " + brokerAddr
}
if clusterName != "" {
command += " -c " + clusterName
}
fmt.Println(command)
out, err := exec.Command("/bin/bash", "-c", command).Output()
if err != nil {
log.Fatal(err)
}
fmt.Println(string(out))
}
func CreateDelayTopic(topicName string, brokerAddr string, clusterName string, nameserver string) {
// use absolute path
command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName
if nameserver != "" {
command += " -n " + nameserver
}
if brokerAddr != "" {
command += " -b " + brokerAddr
}
if clusterName != "" {
command += " -c " + clusterName
}
command += " -a " + "+message.type=DELAY"
fmt.Println(command)
out, err := exec.Command("/bin/bash", "-c", command).Output()
if err != nil {
log.Fatal(err)
}
fmt.Println(string(out))
}
func CreateFIFOTopic(topicName string, brokerAddr string, clusterName string, nameserver string) {
// use absolute path
command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName
if nameserver != "" {
command += " -n " + nameserver
}
if brokerAddr != "" {
command += " -b " + brokerAddr
}
if clusterName != "" {
command += " -c " + clusterName
}
command += " -a " + "+message.type=FIFO"
fmt.Println(command)
out, err := exec.Command("/bin/bash", "-c", command).Output()
if err != nil {
log.Fatal(err)
}
fmt.Println(string(out))
}
func CreateTransactionTopic(topicName string, brokerAddr string, clusterName string, nameserver string) {
// use absolute path
command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName
if nameserver != "" {
command += " -n " + nameserver
}
if brokerAddr != "" {
command += " -b " + brokerAddr
}
if clusterName != "" {
command += " -c " + clusterName
}
command += " -a " + "+message.type=TRANSACTION"
fmt.Println(command)
out, err := exec.Command("/bin/bash", "-c", command).Output()
if err != nil {
log.Fatal(err)
}
fmt.Println(string(out))
}
func CreateOrderlyConsumerGroup(consumerGroup string, brokerAddr string, clusterName string, nameserver string) {
// use absolute path
command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateSubGroup -g " + consumerGroup
if nameserver != "" {
command += " -n " + nameserver
}
if brokerAddr != "" {
command += " -b " + brokerAddr
}
if clusterName != "" {
command += " -c " + clusterName
}
command += " -s true -o true -m false -d false "
fmt.Println(command)
out, err := exec.Command("/bin/bash", "-c", command).Output()
if err != nil {
log.Fatal(err)
}
fmt.Println(string(out))
}
func ClusterList(nameserver string) string {
command := "sh " + root + "/rocketmq-admintools/bin/mqadmin clusterlist"
if nameserver != "" {
command += " -n " + nameserver
}
fmt.Println(command)
out, err := exec.Command("/bin/bash", "-c", command).Output()
if err != nil {
log.Fatal(err)
}
return string(out)
}