func()

in plugin/connector/standalone/consumer.go [87:104]


func (c *Consumer) UpdateOffset(ctx context.Context, events []*ce.Event) error {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	for _, event := range events {
		topicName := event.Subject()
		offset := GetOffsetFromEvent(event)
		if curOffset, ok := c.committedOffset[topicName]; ok {
			if offset <= 0 {
				return fmt.Errorf("fail to update offset, invalid param, topic %s, offset : %d", topicName, offset)
			}
			if offset < curOffset.Load() {
				return nil
			}
			curOffset.Store(offset)
		}
	}
	return nil
}