in v2/processor.go [175:199]
func (p *Processor) startOne(ctx context.Context, receiverName string) error {
logger := getLogger(ctx)
receiverEx, ok := p.receivers[receiverName]
if !ok {
return fmt.Errorf("processor %s not found", receiverName)
}
var savedError error
for attempt := 0; attempt < p.options.StartMaxAttempt; attempt++ {
if err := p.start(ctx, receiverEx); err != nil {
savedError = errors.Join(savedError, err)
logger.Error(fmt.Sprintf("processor %s start attempt %d failed: %v", receiverName, attempt, err))
if attempt+1 == p.options.StartMaxAttempt { // last attempt, return early
break
}
select {
case <-time.After(p.options.StartRetryDelayStrategy.GetDelay(uint32(attempt))):
continue
case <-ctx.Done():
logger.Info("context done, stop retrying")
return ctx.Err()
}
}
}
return savedError
}