func()

in azkustoingest/managed.go [69:109]


func (m *Managed) streamWithRetries(ctx context.Context, payloadProvider func() io.Reader, props properties.All, isBlobUri bool) (*Result, error) {
	var result *Result

	hasCustomId := props.Streaming.ClientRequestId != ""
	i := 0
	managedUuid := uuid.New().String()

	actualBackoff := backoff.WithContext(backoff.WithMaxRetries(props.ManagedStreaming.Backoff, retryCount), ctx)

	var err error = nil
	err = backoff.Retry(func() error {
		if !hasCustomId {
			props.Streaming.ClientRequestId = fmt.Sprintf("KGC.executeManagedStreamingIngest;%s;%d", managedUuid, i)
		}
		result, err = streamImpl(m.streaming.streamConn, ctx, payloadProvider(), props, isBlobUri)
		i++
		if err != nil {
			if e, ok := err.(*errors.Error); ok {
				if errors.Retry(e) {
					return err
				} else {
					return backoff.Permanent(err)
				}
			} else {
				return backoff.Permanent(err)
			}
		}
		return nil
	}, actualBackoff)

	if err == nil {
		return result, nil
	}

	if errors.Retry(err) {
		// Caller should fallback to queued
		return nil, nil
	}

	return nil, err
}