in plugin/connector/standalone/consumer.go [164:196]
func (w *SubscribeWorker) pollMessage() error {
var message *Message
var err error
if w.offset.Load() == 0 {
message, err = w.broker.TakeMessage(w.topicName)
if ok := w.offset.CAS(0, message.GetOffset()); !ok {
return nil
}
} else {
message, err = w.broker.TakeMessageByOffset(w.topicName, w.offset.Load()+1)
}
if err != nil {
return errors.Wrap(err, "fail to take message from standalone broker")
}
commitFunc := func(action connector.EventMeshAction) error {
switch action {
case connector.CommitMessage:
// update offset
w.offset.Store(message.GetOffset())
case connector.ReconsumeLater:
// No-Op
case connector.ManualAck:
// update offset
w.offset.Store(message.GetOffset())
default:
}
return nil
}
w.listener.Consume(message.event, commitFunc)
return nil
}