pkg/bus/bus.go (194 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package bus implements a message bus which is a common data model and a messaging infrastructure // to allow different modules to communicate locally or remotely. package bus import ( "errors" "io" "sync" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/pkg/run" ) type ( payload interface{} // MessageID the identity of a Message. MessageID uint64 // Future represents a future result of an asynchronous publishing. Future interface { Get() (Message, error) GetAll() ([]Message, error) } ) // Message is send on the bus to all subscribed listeners. type Message struct { payload payload id MessageID } // ID outputs the MessageID of the Message. func (m Message) ID() MessageID { return m.id } // Data returns the data wrapped in the Message. func (m Message) Data() interface{} { return m.payload } // NewMessage returns a new Message with a MessageID and embed data. func NewMessage(id MessageID, data interface{}) Message { return Message{id: id, payload: data} } // MessageListener is the signature of functions that can handle an EventMessage. type MessageListener interface { Rev(message Message) Message } // Subscriber allow subscribing a Topic's messages. type Subscriber interface { Subscribe(topic Topic, listener MessageListener) error } // Publisher allow sending Messages to a Topic. type Publisher interface { Publish(topic Topic, message ...Message) (Future, error) } type channel chan event type chType int var ( chTypeUnidirectional chType chTypeBidirectional chType = 1 ) // Topic is the object which messages are sent to or received from. type Topic struct { id string typ chType } // UniTopic returns an unary Topic. func UniTopic(id string) Topic { return Topic{id: id, typ: chTypeUnidirectional} } // BiTopic returns bidirectional Topic. func BiTopic(id string) Topic { return Topic{id: id, typ: chTypeBidirectional} } // The Bus allows publish-subscribe-style communication between components. type Bus struct { topics map[Topic][]channel closer *run.Closer mutex sync.RWMutex } // NewBus returns a Bus. func NewBus() *Bus { b := new(Bus) b.topics = make(map[Topic][]channel) b.closer = run.NewCloser(0) return b } var ( // ErrTopicNotExist hints the topic published doesn't exist. ErrTopicNotExist = errors.New("the topic does not exist") errTopicEmpty = errors.New("the topic is empty") errListenerEmpty = errors.New("the message listener is empty") errEmptyFuture = errors.New("can't invoke Get() on an empty future") ) type emptyFuture struct{} func (e *emptyFuture) Get() (Message, error) { return Message{}, errEmptyFuture } func (e *emptyFuture) GetAll() ([]Message, error) { return nil, errEmptyFuture } type localFuture struct { retCh chan Message retCount int } func (l *localFuture) Get() (Message, error) { if l.retCount < 1 { return Message{}, io.EOF } m, ok := <-l.retCh if ok { l.retCount-- return m, nil } return Message{}, io.EOF } func (l *localFuture) GetAll() ([]Message, error) { var globalErr error ret := make([]Message, 0, l.retCount) for { m, err := l.Get() if errors.Is(err, io.EOF) { return ret, globalErr } if err != nil { globalErr = multierr.Append(globalErr, err) continue } ret = append(ret, m) } } type event struct { f Future m Message } // Publish sends Messages to a Topic. func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) { if topic.id == "" { return nil, errTopicEmpty } b.mutex.RLock() defer b.mutex.RUnlock() cc, exit := b.topics[topic] if !exit { return nil, ErrTopicNotExist } var f Future switch topic.typ { case chTypeUnidirectional: f = nil case chTypeBidirectional: f = &localFuture{retCount: len(message), retCh: make(chan Message)} } for _, each := range cc { for _, m := range message { go func(ch channel, message Message) { if !b.closer.AddRunning() { return } defer b.closer.Done() select { case <-b.closer.CloseNotify(): return case ch <- event{ m: message, f: f, }: } }(each, m) } } if f == nil { return &emptyFuture{}, nil } return f, nil } // Subscribe adds an MessageListener to be called when a message of a Topic is posted. func (b *Bus) Subscribe(topic Topic, listener MessageListener) error { if topic.id == "" { return errTopicEmpty } if listener == nil { return errListenerEmpty } b.mutex.Lock() defer b.mutex.Unlock() if _, exist := b.topics[topic]; !exist { b.topics[topic] = make([]channel, 0) } ch := make(channel) list := b.topics[topic] list = append(list, ch) b.topics[topic] = list go func(listener MessageListener, ch channel) { for { c, ok := <-ch if ok { ret := listener.Rev(c.m) if c.f == nil { continue } if lf, ok := c.f.(*localFuture); ok { lf.retCh <- ret } } else { break } } }(listener, ch) return nil } // Close a Bus until all Messages are sent to Subscribers. func (b *Bus) Close() { b.closer.CloseThenWait() for _, chs := range b.topics { for _, ch := range chs { close(ch) } } }