func NewTraceDispatcher()

in internal/trace.go [244:299]


func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)

	t := traceCfg.TraceTopic
	if len(t) == 0 {
		t = RmqSysTraceTopic
	}

	if traceCfg.Access == primitive.Cloud {
		t = TraceTopicPrefix + traceCfg.TraceTopic
	}

	if len(traceCfg.NamesrvAddrs) == 0 && traceCfg.Resolver == nil {
		panic("no NamesrvAddrs or Resolver configured")
	}

	var srvs *namesrvs
	var err error
	if len(traceCfg.NamesrvAddrs) > 0 {
		srvs, err = NewNamesrv(primitive.NewPassthroughResolver(traceCfg.NamesrvAddrs), nil)
	} else {
		srvs, err = NewNamesrv(traceCfg.Resolver, nil)
	}

	if err != nil {
		panic(errors.Wrap(err, "new Namesrv failed."))
	}
	if !traceCfg.Credentials.IsEmpty() {
		srvs.SetCredentials(traceCfg.Credentials)
	}

	cliOp := DefaultClientOptions()
	cliOp.GroupName = traceCfg.GroupName
	cliOp.NameServerAddrs = traceCfg.NamesrvAddrs
	cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT"
	cliOp.RetryTimes = 0
	cliOp.Namesrv = srvs
	cliOp.Credentials = traceCfg.Credentials
	cli := GetOrNewRocketMQClient(cliOp, nil)
	if cli == nil {
		return nil
	}
	cliOp.Namesrv = cli.GetNameSrv()
	return &traceDispatcher{
		ctx:    ctx,
		cancel: cancel,

		traceTopic: t,
		access:     traceCfg.Access,
		input:      make(chan TraceContext, 1024),
		batchCh:    make(chan []*TraceContext, 2048),
		cli:        cli,
		namesrvs:   srvs,
	}
}