func()

in pkg/bus/bus.go [179:218]


func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
	if topic.id == "" {
		return nil, errTopicEmpty
	}
	b.mutex.RLock()
	defer b.mutex.RUnlock()
	cc, exit := b.topics[topic]
	if !exit {
		return nil, ErrTopicNotExist
	}
	var f Future
	switch topic.typ {
	case chTypeUnidirectional:
		f = nil
	case chTypeBidirectional:
		f = &localFuture{retCount: len(message), retCh: make(chan Message)}
	}
	for _, each := range cc {
		for _, m := range message {
			go func(ch channel, message Message) {
				if !b.closer.AddRunning() {
					return
				}
				defer b.closer.Done()
				select {
				case <-b.closer.CloseNotify():
					return
				case ch <- event{
					m: message,
					f: f,
				}:
				}
			}(each, m)
		}
	}
	if f == nil {
		return &emptyFuture{}, nil
	}
	return f, nil
}