in internal/trace.go [244:299]
func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
t := traceCfg.TraceTopic
if len(t) == 0 {
t = RmqSysTraceTopic
}
if traceCfg.Access == primitive.Cloud {
t = TraceTopicPrefix + traceCfg.TraceTopic
}
if len(traceCfg.NamesrvAddrs) == 0 && traceCfg.Resolver == nil {
panic("no NamesrvAddrs or Resolver configured")
}
var srvs *namesrvs
var err error
if len(traceCfg.NamesrvAddrs) > 0 {
srvs, err = NewNamesrv(primitive.NewPassthroughResolver(traceCfg.NamesrvAddrs), nil)
} else {
srvs, err = NewNamesrv(traceCfg.Resolver, nil)
}
if err != nil {
panic(errors.Wrap(err, "new Namesrv failed."))
}
if !traceCfg.Credentials.IsEmpty() {
srvs.SetCredentials(traceCfg.Credentials)
}
cliOp := DefaultClientOptions()
cliOp.GroupName = traceCfg.GroupName
cliOp.NameServerAddrs = traceCfg.NamesrvAddrs
cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT"
cliOp.RetryTimes = 0
cliOp.Namesrv = srvs
cliOp.Credentials = traceCfg.Credentials
cli := GetOrNewRocketMQClient(cliOp, nil)
if cli == nil {
return nil
}
cliOp.Namesrv = cli.GetNameSrv()
return &traceDispatcher{
ctx: ctx,
cancel: cancel,
traceTopic: t,
access: traceCfg.Access,
input: make(chan TraceContext, 1024),
batchCh: make(chan []*TraceContext, 2048),
cli: cli,
namesrvs: srvs,
}
}