func()

in plugin/connector/standalone/consumer.go [164:196]


func (w *SubscribeWorker) pollMessage() error {
	var message *Message
	var err error
	if w.offset.Load() == 0 {
		message, err = w.broker.TakeMessage(w.topicName)
		if ok := w.offset.CAS(0, message.GetOffset()); !ok {
			return nil
		}
	} else {
		message, err = w.broker.TakeMessageByOffset(w.topicName, w.offset.Load()+1)
	}
	if err != nil {
		return errors.Wrap(err, "fail to take message from standalone broker")
	}

	commitFunc := func(action connector.EventMeshAction) error {
		switch action {
		case connector.CommitMessage:
			// update offset
			w.offset.Store(message.GetOffset())
		case connector.ReconsumeLater:
			// No-Op
		case connector.ManualAck:
			// update offset
			w.offset.Store(message.GetOffset())
		default:
		}
		return nil
	}

	w.listener.Consume(message.event, commitFunc)
	return nil
}