func()

in v2/processor.go [175:199]


func (p *Processor) startOne(ctx context.Context, receiverName string) error {
	logger := getLogger(ctx)
	receiverEx, ok := p.receivers[receiverName]
	if !ok {
		return fmt.Errorf("processor %s not found", receiverName)
	}
	var savedError error
	for attempt := 0; attempt < p.options.StartMaxAttempt; attempt++ {
		if err := p.start(ctx, receiverEx); err != nil {
			savedError = errors.Join(savedError, err)
			logger.Error(fmt.Sprintf("processor %s start attempt %d failed: %v", receiverName, attempt, err))
			if attempt+1 == p.options.StartMaxAttempt { // last attempt, return early
				break
			}
			select {
			case <-time.After(p.options.StartRetryDelayStrategy.GetDelay(uint32(attempt))):
				continue
			case <-ctx.Done():
				logger.Info("context done, stop retrying")
				return ctx.Err()
			}
		}
	}
	return savedError
}