plugin/connector/standalone/broker.go (192 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package standalone import ( "errors" "fmt" "sync" "time" ce "github.com/cloudevents/sdk-go/v2" "go.uber.org/atomic" ) const ( defaultQueueSize = 1024 defaultExpireMills = 60 * 60 * 1000 ) // MessageQueue message storage of standalone broker type MessageQueue struct { capacity int items []*Message mutex sync.Mutex notFull sync.Cond newMsg sync.Cond } func NewMessageQueue() (*MessageQueue, error) { return NewMessageQueueWithCapacity(defaultQueueSize) } func NewMessageQueueWithCapacity(capacity int) (*MessageQueue, error) { if capacity <= 0 { return nil, errors.New("fail to create message queue: capacity must be positive") } queue := &MessageQueue{ capacity: capacity, items: make([]*Message, 0, capacity), } queue.notFull = sync.Cond{L: &queue.mutex} queue.newMsg = sync.Cond{L: &queue.mutex} return queue, nil } func (q *MessageQueue) Put(message *Message) { q.mutex.Lock() defer q.mutex.Unlock() // Wait util queue is not null for q.capacity == len(q.items) { q.notFull.Wait() } q.items = append(q.items, message) q.newMsg.Signal() } // Get fetch message in top of queue, block if message queue is empty func (q *MessageQueue) Get() (*Message, error) { q.mutex.Lock() defer q.mutex.Unlock() // Wait util queue is not empty for len(q.items) == 0 { q.newMsg.Wait() } return q.items[0], nil } // GetIfNotEmpty fetch message in top of queue, return nil if message queue is empty func (q *MessageQueue) GetIfNotEmpty() *Message { q.mutex.Lock() defer q.mutex.Unlock() if len(q.items) > 0 { return q.items[0] } return nil } // GetByOffset fetch message of curtain offset // return error if message has been deleted, block if message of that offset is not available currently func (q *MessageQueue) GetByOffset(offset int64) (*Message, error) { q.mutex.Lock() defer q.mutex.Unlock() if offset < 0 { return nil, fmt.Errorf("invalid offset param: %d", offset) } for { // Wait util queue is not empty if len(q.items) == 0 { q.newMsg.Wait() continue } lastMessage := q.items[len(q.items)-1] // Wait util new message has been put if lastMessage.GetOffset() < offset { q.newMsg.Wait() continue } firstMessage := q.items[0] // message has been deleted if firstMessage.GetOffset() > offset { return nil, fmt.Errorf("offset has been deleted, offset : %d", offset) } index := int(offset - firstMessage.GetOffset()) return q.items[index], nil } } // PopMessage remove message in the top, return nil if message queue is empty func (q *MessageQueue) PopMessage() *Message { q.mutex.Lock() defer q.mutex.Unlock() if len(q.items) != 0 { message := q.items[0] q.items = q.items[1:] return message } return nil } func (q *MessageQueue) Size() int { return len(q.items) } // Broker a manager of different topic message queue type Broker struct { // <topicName, messageQueue> queueContainer map[string]*MessageQueue // <topicName, offset> offsetContainer map[string]*atomic.Int64 // <topicName, worker> expireWorkers map[string]*MessageExpireWorker } var initOnce sync.Once var broker *Broker // GetBroker all topic shared same broker func GetBroker() *Broker { initOnce.Do(func() { broker = &Broker{ queueContainer: make(map[string]*MessageQueue), offsetContainer: make(map[string]*atomic.Int64), expireWorkers: make(map[string]*MessageExpireWorker), } }, ) return broker } func (b *Broker) TakeMessage(topicName string) (*Message, error) { if err := b.CreateNewQueueIfAbsent(topicName); err != nil { return nil, err } return b.queueContainer[topicName].Get() } func (b *Broker) TakeMessageByOffset(topicName string, offset int64) (*Message, error) { if err := b.CreateNewQueueIfAbsent(topicName); err != nil { return nil, err } return b.queueContainer[topicName].GetByOffset(offset) } func (b *Broker) PutMessage(topicName string, event *ce.Event) (message *Message, err error) { if err = b.CreateNewQueueIfAbsent(topicName); err != nil { return } message = &Message{ createTimeMills: time.Now().UnixMilli(), event: event, } message.SetOffset(b.offsetContainer[topicName].Add(1)) b.queueContainer[topicName].Put(message) return } func (b *Broker) CreateNewQueueIfAbsent(topicName string) (err error) { if _, ok := b.queueContainer[topicName]; ok { return } queue, err := NewMessageQueue() if err != nil { return } offset := atomic.NewInt64(0) b.queueContainer[topicName] = queue b.offsetContainer[topicName] = offset expireWorker := &MessageExpireWorker{ messageQueue: queue, quit: make(chan struct{}, 1), expireMills: defaultExpireMills, } b.expireWorkers[topicName] = expireWorker go expireWorker.schedule(5 * time.Second) return } func (b *Broker) ExistTopic(topicName string) (exist bool) { _, exist = b.queueContainer[topicName] return } // MessageExpireWorker periodically evict expire message type MessageExpireWorker struct { messageQueue *MessageQueue quit chan struct{} expireMills int64 } func (w *MessageExpireWorker) schedule(interval time.Duration) { ticket := time.NewTicker(interval) for { select { case <-ticket.C: w.doEvict() case <-w.quit: ticket.Stop() return } } } func (w *MessageExpireWorker) doEvict() { message := w.messageQueue.GetIfNotEmpty() for message != nil { if message.createTimeMills+w.expireMills < time.Now().UnixMilli() { w.messageQueue.PopMessage() } message = w.messageQueue.GetIfNotEmpty() } } func (w *MessageExpireWorker) stop() { w.quit <- struct{}{} }