in v2/processor.go [144:169]
func (p *Processor) Start(ctx context.Context) error {
wg := sync.WaitGroup{}
errChan := make(chan error, len(p.receivers))
for name := range p.receivers {
wg.Add(1)
go func(receiverName string) {
defer func() {
if rec := recover(); rec != nil {
errChan <- fmt.Errorf("panic recovered from processor %s: %v", receiverName, rec)
}
wg.Done()
}()
err := p.startOne(ctx, receiverName)
if err != nil {
errChan <- err
}
}(name)
}
wg.Wait()
close(errChan)
var allErrs []error
for err := range errChan {
allErrs = append(allErrs, err)
}
return errors.Join(allErrs...)
}