in consumer/push_consumer.go [78:131]
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
defaultOpts := defaultPushConsumerOptions()
for _, apply := range opts {
apply(&defaultOpts)
}
srvs, err := internal.NewNamesrv(defaultOpts.Resolver, defaultOpts.RemotingClientConfig)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
if !defaultOpts.Credentials.IsEmpty() {
srvs.SetCredentials(defaultOpts.Credentials)
}
defaultOpts.Namesrv = srvs
if defaultOpts.Namespace != "" {
defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
}
dc := &defaultConsumer{
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
cType: _PushConsume,
state: int32(internal.StateCreateJust),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
consumeOrderly: defaultOpts.ConsumeOrderly,
fromWhere: defaultOpts.FromWhere,
allocate: defaultOpts.Strategy,
option: defaultOpts,
}
if dc.client == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
defaultOpts.Namesrv = dc.client.GetNameSrv()
p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
queueLock: newQueueLock(),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
crCh: make(map[string]chan struct{}),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
p.submitToConsume = p.consumeMessageOrderly
} else {
p.submitToConsume = p.consumeMessageConcurrently
}
p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
return p, nil
}