in pkg/bus/bus.go [179:218]
func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
if topic.id == "" {
return nil, errTopicEmpty
}
b.mutex.RLock()
defer b.mutex.RUnlock()
cc, exit := b.topics[topic]
if !exit {
return nil, ErrTopicNotExist
}
var f Future
switch topic.typ {
case chTypeUnidirectional:
f = nil
case chTypeBidirectional:
f = &localFuture{retCount: len(message), retCh: make(chan Message)}
}
for _, each := range cc {
for _, m := range message {
go func(ch channel, message Message) {
if !b.closer.AddRunning() {
return
}
defer b.closer.Done()
select {
case <-b.closer.CloseNotify():
return
case ch <- event{
m: message,
f: f,
}:
}
}(each, m)
}
}
if f == nil {
return &emptyFuture{}, nil
}
return f, nil
}