pkg/bus/bus.go (186 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package bus implements a message bus which is a common data model and a messaging infrastructure // to allow different modules to communicate locally or remotely. package bus import ( "context" "errors" "io" "sync" "time" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" ) type ( payload interface{} // MessageID the identity of a Message. MessageID uint64 // Future represents a future result of an asynchronous publishing. Future interface { Get() (Message, error) GetAll() ([]Message, error) } ) // Message is send on the bus to all subscribed listeners. type Message struct { payload payload nodeSelectors map[string][]string timeRange *modelv1.TimeRange node string id MessageID batchMode bool } // ID outputs the MessageID of the Message. func (m Message) ID() MessageID { return m.id } // Data returns the data wrapped in the Message. func (m Message) Data() interface{} { return m.payload } // Node returns the node name of the Message. func (m Message) Node() string { return m.node } // NodeSelectors returns the node selectors of the Message. func (m Message) NodeSelectors() map[string][]string { return m.nodeSelectors } // TimeRange returns the time range of the Message. func (m Message) TimeRange() *modelv1.TimeRange { return m.timeRange } // BatchModeEnabled returns whether the Message is sent in batch mode. func (m Message) BatchModeEnabled() bool { return m.batchMode } // NewMessage returns a new Message with a MessageID and embed data. func NewMessage(id MessageID, data interface{}) Message { return Message{id: id, node: "local", payload: data} } // NewBatchMessageWithNode returns a new Message with a MessageID and NodeID and embed data. func NewBatchMessageWithNode(id MessageID, node string, data interface{}) Message { return Message{id: id, node: node, payload: data, batchMode: true} } // NewMessageWithNode returns a new Message with a MessageID and NodeID and embed data. func NewMessageWithNode(id MessageID, node string, data interface{}) Message { return Message{id: id, node: node, payload: data} } // NewMessageWithNodeSelectors returns a new Message with a MessageID and NodeSelectors and embed data. // Nodes matching any of the selectors will receive the message. func NewMessageWithNodeSelectors(id MessageID, nodeSelectors map[string][]string, timeRange *modelv1.TimeRange, data interface{}) Message { return Message{id: id, nodeSelectors: nodeSelectors, timeRange: timeRange, payload: data} } // MessageListener is the signature of functions that can handle an EventMessage. type MessageListener interface { Rev(ctx context.Context, message Message) Message CheckHealth() *common.Error } // Subscriber allow subscribing a Topic's messages. type Subscriber interface { Subscribe(topic Topic, listener MessageListener) error } // Publisher allow sending Messages to a Topic. type Publisher interface { Publish(ctx context.Context, topic Topic, message ...Message) (Future, error) } // Broadcaster allow sending Messages to a Topic and receiving the responses. type Broadcaster interface { Broadcast(timeout time.Duration, topic Topic, message Message) ([]Future, error) } // UnImplementedHealthyListener is a listener that is not implemented. But it is healthy. type UnImplementedHealthyListener struct{} // CheckHealth always returns nil. func (h *UnImplementedHealthyListener) CheckHealth() *common.Error { return nil } // Rev always panics. func (h *UnImplementedHealthyListener) Rev(context.Context, Message) Message { panic("implement me") } type chType int var ( chTypeUnidirectional chType chTypeBidirectional chType = 1 ) // Topic is the object which messages are sent to or received from. type Topic struct { id string typ chType } // UniTopic returns an unary Topic. func UniTopic(id string) Topic { return Topic{id: id, typ: chTypeUnidirectional} } // BiTopic returns bidirectional Topic. func BiTopic(id string) Topic { return Topic{id: id, typ: chTypeBidirectional} } // String returns the string representation of the Topic. func (t Topic) String() string { return t.id } // The Bus allows publish-subscribe-style communication between components. type Bus struct { topics map[Topic][]MessageListener mutex sync.RWMutex } // NewBus returns a Bus. func NewBus() *Bus { b := new(Bus) b.topics = make(map[Topic][]MessageListener) return b } var ( // ErrTopicNotExist hints the topic published doesn't exist. ErrTopicNotExist = errors.New("the topic does not exist") errTopicEmpty = errors.New("the topic is empty") errListenerEmpty = errors.New("the message listener is empty") errEmptyFuture = errors.New("can't invoke Get() on an empty future") ) type emptyFuture struct{} func (e *emptyFuture) Get() (Message, error) { return Message{}, errEmptyFuture } func (e *emptyFuture) GetAll() ([]Message, error) { return nil, errEmptyFuture } type localFuture struct { messages []Message } func (l *localFuture) Get() (Message, error) { if len(l.messages) == 0 { return Message{}, io.EOF } m := l.messages[0] l.messages = l.messages[1:] return m, nil } func (l *localFuture) GetAll() ([]Message, error) { return l.messages, nil } // Publish sends Messages to a Topic. func (b *Bus) Publish(ctx context.Context, topic Topic, message ...Message) (Future, error) { if topic.id == "" { return nil, errTopicEmpty } b.mutex.RLock() defer b.mutex.RUnlock() mll, exit := b.topics[topic] if !exit { return nil, ErrTopicNotExist } var f *localFuture switch topic.typ { case chTypeUnidirectional: f = nil case chTypeBidirectional: f = &localFuture{messages: make([]Message, 0, len(message))} } var err error for _, ml := range mll { if e := ml.CheckHealth(); e != nil { err = multierr.Append(err, e) continue } for _, m := range message { if f != nil { f.messages = append(f.messages, ml.Rev(ctx, m)) } else { ml.Rev(ctx, m) } } } if f == nil { return &emptyFuture{}, err } return f, err } // Subscribe adds an MessageListener to be called when a message of a Topic is posted. func (b *Bus) Subscribe(topic Topic, listener MessageListener) error { if topic.id == "" { return errTopicEmpty } if listener == nil { return errListenerEmpty } b.mutex.Lock() defer b.mutex.Unlock() if _, exist := b.topics[topic]; !exist { b.topics[topic] = make([]MessageListener, 0) } list := b.topics[topic] list = append(list, listener) b.topics[topic] = list return nil } // Close a Bus until all Messages are sent to Subscribers. func (b *Bus) Close() { }