in pulsar/consumer_regex.go [66:119]
func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
rc := ®exConsumer{
client: c,
dlq: dlq,
rlq: rlq,
options: opts,
messageCh: msgCh,
namespace: tn.Namespace,
pattern: pattern,
consumers: make(map[string]Consumer),
subscribeCh: make(chan []string, 1),
unsubscribeCh: make(chan []string, 1),
closeCh: make(chan struct{}),
log: c.log.SubLogger(log.Fields{"topic": tn.Name}),
consumerName: opts.Name,
}
topics, err := rc.topics()
if err != nil {
return nil, err
}
var errs error
for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
rc.consumers[ce.topic] = ce.consumer
}
}
if errs != nil {
for _, c := range rc.consumers {
c.Close()
}
return nil, errs
}
// set up timer
duration := opts.AutoDiscoveryPeriod
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
}
rc.ticker = time.NewTicker(duration)
go rc.monitor()
return rc, nil
}