in admin/admin.go [223:273]
func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
cfg := defaultTopicConfigDelete()
for _, apply := range opts {
apply(&cfg)
}
//delete topic in broker
if cfg.BrokerAddr == "" {
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.Topic)
}
if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
rlog.Error("delete topic in broker error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
return err
}
//delete topic in nameserver
if len(cfg.NameSrvAddr) == 0 {
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
_, _, err := a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
if err != nil {
rlog.Error("delete topic in nameserver error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyUnderlayError: err,
})
}
cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
}
for _, nameSrvAddr := range cfg.NameSrvAddr {
if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil {
rlog.Error("delete topic in nameserver error", map[string]interface{}{
"nameServer": nameSrvAddr,
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyUnderlayError: err,
})
return err
}
}
rlog.Info("delete topic success", map[string]interface{}{
"nameServer": cfg.NameSrvAddr,
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
})
return nil
}