pkg/event/bus.go (68 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"context"
"github.com/apache/servicecomb-service-center/pkg/queue"
"github.com/apache/servicecomb-service-center/pkg/util"
)
// Bus can fire the event aync and dispatch events to subscriber according to subject
type Bus struct {
*queue.TaskQueue
name string
subjects *util.ConcurrentMap
}
func (bus *Bus) Name() string {
return bus.name
}
func (bus *Bus) Fire(evt Event) {
// TODO add option if queue is full
bus.Add(queue.Task{Payload: evt})
}
func (bus *Bus) Handle(_ context.Context, payload interface{}) {
bus.fireAtOnce(payload.(Event))
}
func (bus *Bus) fireAtOnce(evt Event) {
if itf, ok := bus.subjects.Get(evt.Subject()); ok {
itf.(*Poster).Post(evt)
} // else the evt will be discard
}
func (bus *Bus) Subjects(name string) *Poster {
itf, ok := bus.subjects.Get(name)
if !ok {
return nil
}
return itf.(*Poster)
}
func (bus *Bus) AddSubscriber(n Subscriber) {
item, _ := bus.subjects.Fetch(n.Subject(), func() (interface{}, error) {
return NewPoster(n.Subject()), nil
})
item.(*Poster).GetOrNewGroup(n.Group()).AddMember(n)
}
func (bus *Bus) RemoveSubscriber(n Subscriber) {
itf, ok := bus.subjects.Get(n.Subject())
if !ok {
return
}
s := itf.(*Poster)
g := s.Groups(n.Group())
if g == nil {
return
}
g.RemoveMember(n.ID())
if g.Size() == 0 {
s.RemoveGroup(g.Name())
}
if s.Size() == 0 {
bus.subjects.Remove(s.Subject())
}
}
func (bus *Bus) Clear() {
bus.subjects.Clear()
}
func NewBus(name string, queueSize int) *Bus {
p := &Bus{
TaskQueue: queue.NewTaskQueue(queueSize),
name: name,
subjects: util.NewConcurrentMap(0),
}
p.AddWorker(p)
return p
}