in plugin/connector/standalone/consumer.go [87:104]
func (c *Consumer) UpdateOffset(ctx context.Context, events []*ce.Event) error {
c.mutex.Lock()
defer c.mutex.Unlock()
for _, event := range events {
topicName := event.Subject()
offset := GetOffsetFromEvent(event)
if curOffset, ok := c.committedOffset[topicName]; ok {
if offset <= 0 {
return fmt.Errorf("fail to update offset, invalid param, topic %s, offset : %d", topicName, offset)
}
if offset < curOffset.Load() {
return nil
}
curOffset.Store(offset)
}
}
return nil
}