plugin/connector/standalone/consumer.go (156 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package standalone
import (
"context"
"fmt"
"sync"
ce "github.com/cloudevents/sdk-go/v2"
"github.com/pkg/errors"
"go.uber.org/atomic"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
)
type Consumer struct {
broker *Broker
subscribes map[string]*SubscribeWorker
committedOffset map[string]*atomic.Int64
mutex sync.Mutex
listener connector.EventListener
started atomic.Bool
}
func NewConsumer() *Consumer {
return &Consumer{
broker: GetBroker(),
subscribes: make(map[string]*SubscribeWorker),
committedOffset: make(map[string]*atomic.Int64),
}
}
func (c *Consumer) Subscribe(topicName string) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.IsClosed() {
return errors.New("fail to subscribe topic, consumer has been closed")
}
if _, ok := c.subscribes[topicName]; !ok {
err := c.broker.CreateNewQueueIfAbsent(topicName)
if err != nil {
return err
}
offset := atomic.NewInt64(0)
worker := &SubscribeWorker{
broker: broker,
topicName: topicName,
offset: offset,
listener: c.listener,
quit: make(chan struct{}, 1),
}
c.committedOffset[topicName] = offset
c.subscribes[topicName] = worker
go worker.run()
}
return nil
}
func (c *Consumer) Unsubscribe(topicName string) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if worker, ok := c.subscribes[topicName]; ok {
delete(c.subscribes, topicName)
worker.Stop()
}
return nil
}
func (c *Consumer) UpdateOffset(ctx context.Context, events []*ce.Event) error {
c.mutex.Lock()
defer c.mutex.Unlock()
for _, event := range events {
topicName := event.Subject()
offset := GetOffsetFromEvent(event)
if curOffset, ok := c.committedOffset[topicName]; ok {
if offset <= 0 {
return fmt.Errorf("fail to update offset, invalid param, topic %s, offset : %d", topicName, offset)
}
if offset < curOffset.Load() {
return nil
}
curOffset.Store(offset)
}
}
return nil
}
func (c *Consumer) RegisterEventListener(listener *connector.EventListener) {
c.listener = *listener
}
func (c *Consumer) InitConsumer(properties map[string]string) error {
// No-Op for standalone connector
return nil
}
func (c *Consumer) Start() error {
c.started.CAS(false, true)
return nil
}
func (c *Consumer) Shutdown() error {
c.started.CAS(true, false)
if ok := c.started.CAS(true, false); ok {
for topicName := range c.subscribes {
c.Unsubscribe(topicName)
delete(c.subscribes, topicName)
}
}
return nil
}
func (c *Consumer) IsStarted() bool {
return c.started.Load()
}
func (c *Consumer) IsClosed() bool {
return !c.started.Load()
}
// SubscribeWorker pollMessage from topic and manage consume offset
type SubscribeWorker struct {
topicName string
broker *Broker
listener connector.EventListener
offset *atomic.Int64
quit chan struct{}
}
func (w *SubscribeWorker) run() {
for {
select {
case <-w.quit:
return
default:
err := w.pollMessage()
if err != nil {
// retry
log.Error("[Standalone Consumer] fail to poll message from broker, err=%v", err)
continue
}
}
}
}
func (w *SubscribeWorker) pollMessage() error {
var message *Message
var err error
if w.offset.Load() == 0 {
message, err = w.broker.TakeMessage(w.topicName)
if ok := w.offset.CAS(0, message.GetOffset()); !ok {
return nil
}
} else {
message, err = w.broker.TakeMessageByOffset(w.topicName, w.offset.Load()+1)
}
if err != nil {
return errors.Wrap(err, "fail to take message from standalone broker")
}
commitFunc := func(action connector.EventMeshAction) error {
switch action {
case connector.CommitMessage:
// update offset
w.offset.Store(message.GetOffset())
case connector.ReconsumeLater:
// No-Op
case connector.ManualAck:
// update offset
w.offset.Store(message.GetOffset())
default:
}
return nil
}
w.listener.Consume(message.event, commitFunc)
return nil
}
func (w *SubscribeWorker) Stop() {
w.quit <- struct{}{}
}