in banyand/queue/pub/pub.go [113:199]
func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages bus.Message) ([]bus.Future, error) {
var nodes []*databasev1.Node
p.mu.RLock()
for k := range p.active {
if n := p.registered[k]; n != nil {
nodes = append(nodes, n)
}
}
p.mu.RUnlock()
if len(nodes) == 0 {
return nil, errors.New("no active nodes")
}
names := make(map[string]struct{})
if len(messages.NodeSelectors()) == 0 {
for _, n := range nodes {
names[n.Metadata.GetName()] = struct{}{}
}
} else {
for _, sel := range messages.NodeSelectors() {
var matches []MatchFunc
if sel == nil {
matches = bypassMatches
} else {
for _, s := range sel {
selector, err := ParseLabelSelector(s)
if err != nil {
return nil, fmt.Errorf("failed to parse node selector: %w", err)
}
matches = append(matches, selector.Matches)
}
}
for _, n := range nodes {
for _, m := range matches {
if m(n.Labels) {
names[n.Metadata.Name] = struct{}{}
break
}
}
}
}
}
if l := p.log.Debug(); l.Enabled() {
l.Msgf("broadcasting message to %s nodes", names)
}
if len(names) == 0 {
return nil, fmt.Errorf("no nodes match the selector %v", messages.NodeSelectors())
}
futureCh := make(chan publishResult, len(names))
var wg sync.WaitGroup
for n := range names {
wg.Add(1)
go func(n string) {
defer wg.Done()
f, err := p.publish(timeout, topic, bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
futureCh <- publishResult{n: n, f: f, e: err}
}(n)
}
go func() {
wg.Wait()
close(futureCh)
}()
var futures []bus.Future
var errs error
for f := range futureCh {
if f.e != nil {
errs = multierr.Append(errs, errors.Wrapf(f.e, "failed to publish message to %s", f.n))
if isFailoverError(f.e) {
if p.closer.AddRunning() {
go func() {
defer p.closer.Done()
p.failover(f.n, common.NewErrorWithStatus(modelv1.Status_STATUS_INTERNAL_ERROR, f.e.Error()), topic)
}()
}
}
continue
}
futures = append(futures, f.f)
}
if errs != nil {
return futures, fmt.Errorf("broadcast errors: %w", errs)
}
return futures, nil
}