func newRegexConsumer()

in pulsar/consumer_regex.go [66:119]


func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
	msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
	rc := &regexConsumer{
		client:    c,
		dlq:       dlq,
		rlq:       rlq,
		options:   opts,
		messageCh: msgCh,

		namespace: tn.Namespace,
		pattern:   pattern,

		consumers:     make(map[string]Consumer),
		subscribeCh:   make(chan []string, 1),
		unsubscribeCh: make(chan []string, 1),

		closeCh: make(chan struct{}),

		log:          c.log.SubLogger(log.Fields{"topic": tn.Name}),
		consumerName: opts.Name,
	}

	topics, err := rc.topics()
	if err != nil {
		return nil, err
	}

	var errs error
	for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
		if ce.err != nil {
			errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
		} else {
			rc.consumers[ce.topic] = ce.consumer
		}
	}

	if errs != nil {
		for _, c := range rc.consumers {
			c.Close()
		}
		return nil, errs
	}

	// set up timer
	duration := opts.AutoDiscoveryPeriod
	if duration <= 0 {
		duration = defaultAutoDiscoveryDuration
	}
	rc.ticker = time.NewTicker(duration)

	go rc.monitor()

	return rc, nil
}