admin/admin.go (223 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admin
import (
"context"
"fmt"
"sync"
"time"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
type Admin interface {
CreateTopic(ctx context.Context, opts ...OptionCreate) error
DeleteTopic(ctx context.Context, opts ...OptionDelete) error
GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error)
FetchAllTopicList(ctx context.Context) (*TopicList, error)
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
Close() error
}
// TODO: move outdated context to ctx
type adminOptions struct {
internal.ClientOptions
}
type AdminOption func(options *adminOptions)
func defaultAdminOptions() *adminOptions {
opts := &adminOptions{
ClientOptions: internal.DefaultClientOptions(),
}
opts.GroupName = "TOOLS_ADMIN"
opts.InstanceName = time.Now().String()
return opts
}
// WithResolver nameserver resolver to fetch nameserver addr
func WithResolver(resolver primitive.NsResolver) AdminOption {
return func(options *adminOptions) {
options.Resolver = resolver
}
}
func WithCredentials(c primitive.Credentials) AdminOption {
return func(options *adminOptions) {
options.ClientOptions.Credentials = c
}
}
// WithNamespace set the namespace of admin
func WithNamespace(namespace string) AdminOption {
return func(options *adminOptions) {
options.ClientOptions.Namespace = namespace
}
}
func WithTls(useTls bool) AdminOption {
return func(options *adminOptions) {
options.ClientOptions.RemotingClientConfig.UseTls = useTls
}
}
type admin struct {
cli internal.RMQClient
opts *adminOptions
closeOnce sync.Once
}
// NewAdmin initialize admin
func NewAdmin(opts ...AdminOption) (*admin, error) {
defaultOpts := defaultAdminOptions()
for _, opt := range opts {
opt(defaultOpts)
}
namesrv, err := internal.NewNamesrv(defaultOpts.Resolver, defaultOpts.RemotingClientConfig)
defaultOpts.Namesrv = namesrv
if err != nil {
return nil, err
}
if !defaultOpts.Credentials.IsEmpty() {
namesrv.SetCredentials(defaultOpts.Credentials)
}
cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
if cli == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
defaultOpts.Namesrv = cli.GetNameSrv()
//log.Printf("Client: %#v", namesrv.srvs)
return &admin{
cli: cli,
opts: defaultOpts,
}, nil
}
func (a *admin) GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error) {
cmd := remote.NewRemotingCommand(internal.ReqGetAllSubscriptionGroupConfig, nil, nil)
a.cli.RegisterACL()
response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, timeoutMillis)
if err != nil {
rlog.Error("Get all group list error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, err
} else {
rlog.Info("Get all group list success", map[string]interface{}{})
}
var subscriptionGroupWrapper SubscriptionGroupWrapper
_, err = subscriptionGroupWrapper.Decode(response.Body, &subscriptionGroupWrapper)
if err != nil {
rlog.Error("Get all group list decode error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, err
}
return &subscriptionGroupWrapper, nil
}
func (a *admin) FetchAllTopicList(ctx context.Context) (*TopicList, error) {
cmd := remote.NewRemotingCommand(internal.ReqGetAllTopicListFromNameServer, nil, nil)
response, err := a.cli.InvokeSync(ctx, a.cli.GetNameSrv().AddrList()[0], cmd, 3*time.Second)
if err != nil {
rlog.Error("Fetch all topic list error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, err
} else {
rlog.Info("Fetch all topic list success", map[string]interface{}{})
}
var topicList TopicList
_, err = topicList.Decode(response.Body, &topicList)
if err != nil {
rlog.Error("Fetch all topic list decode error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, err
}
return &topicList, nil
}
// CreateTopic create topic.
// TODO: another implementation like sarama, without brokerAddr as input
func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
cfg := defaultTopicConfigCreate()
for _, apply := range opts {
apply(&cfg)
}
request := &internal.CreateTopicRequestHeader{
Topic: cfg.Topic,
DefaultTopic: cfg.DefaultTopic,
ReadQueueNums: cfg.ReadQueueNums,
WriteQueueNums: cfg.WriteQueueNums,
Perm: cfg.Perm,
TopicFilterType: cfg.TopicFilterType,
TopicSysFlag: cfg.TopicSysFlag,
Order: cfg.Order,
}
cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil)
_, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second)
if err != nil {
rlog.Error("create topic error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
} else {
rlog.Info("create topic success", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
})
}
return err
}
// DeleteTopicInBroker delete topic in broker.
func (a *admin) deleteTopicInBroker(ctx context.Context, topic string, brokerAddr string) (*remote.RemotingCommand, error) {
request := &internal.DeleteTopicRequestHeader{
Topic: topic,
}
cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInBroker, request, nil)
return a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second)
}
// DeleteTopicInNameServer delete topic in nameserver.
func (a *admin) deleteTopicInNameServer(ctx context.Context, topic string, nameSrvAddr string) (*remote.RemotingCommand, error) {
request := &internal.DeleteTopicRequestHeader{
Topic: topic,
}
cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInNameSrv, request, nil)
return a.cli.InvokeSync(ctx, nameSrvAddr, cmd, 5*time.Second)
}
// DeleteTopic delete topic in both broker and nameserver.
func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
cfg := defaultTopicConfigDelete()
for _, apply := range opts {
apply(&cfg)
}
//delete topic in broker
if cfg.BrokerAddr == "" {
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.Topic)
}
if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
rlog.Error("delete topic in broker error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
return err
}
//delete topic in nameserver
if len(cfg.NameSrvAddr) == 0 {
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
_, _, err := a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
if err != nil {
rlog.Error("delete topic in nameserver error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyUnderlayError: err,
})
}
cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
}
for _, nameSrvAddr := range cfg.NameSrvAddr {
if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil {
rlog.Error("delete topic in nameserver error", map[string]interface{}{
"nameServer": nameSrvAddr,
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyUnderlayError: err,
})
return err
}
}
rlog.Info("delete topic success", map[string]interface{}{
"nameServer": cfg.NameSrvAddr,
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
})
return nil
}
func (a *admin) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) {
return a.cli.GetNameSrv().FetchPublishMessageQueues(utils.WrapNamespace(a.opts.Namespace, topic))
}
func (a *admin) Close() error {
a.closeOnce.Do(func() {
a.cli.Shutdown()
})
return nil
}