in plugin/connector/standalone/consumer.go [48:75]
func (c *Consumer) Subscribe(topicName string) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.IsClosed() {
return errors.New("fail to subscribe topic, consumer has been closed")
}
if _, ok := c.subscribes[topicName]; !ok {
err := c.broker.CreateNewQueueIfAbsent(topicName)
if err != nil {
return err
}
offset := atomic.NewInt64(0)
worker := &SubscribeWorker{
broker: broker,
topicName: topicName,
offset: offset,
listener: c.listener,
quit: make(chan struct{}, 1),
}
c.committedOffset[topicName] = offset
c.subscribes[topicName] = worker
go worker.run()
}
return nil
}