func()

in plugin/connector/standalone/consumer.go [48:75]


func (c *Consumer) Subscribe(topicName string) error {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	if c.IsClosed() {
		return errors.New("fail to subscribe topic, consumer has been closed")
	}

	if _, ok := c.subscribes[topicName]; !ok {
		err := c.broker.CreateNewQueueIfAbsent(topicName)
		if err != nil {
			return err
		}
		offset := atomic.NewInt64(0)
		worker := &SubscribeWorker{
			broker:    broker,
			topicName: topicName,
			offset:    offset,
			listener:  c.listener,
			quit:      make(chan struct{}, 1),
		}

		c.committedOffset[topicName] = offset
		c.subscribes[topicName] = worker
		go worker.run()
	}
	return nil
}