in azkustoingest/managed.go [69:109]
func (m *Managed) streamWithRetries(ctx context.Context, payloadProvider func() io.Reader, props properties.All, isBlobUri bool) (*Result, error) {
var result *Result
hasCustomId := props.Streaming.ClientRequestId != ""
i := 0
managedUuid := uuid.New().String()
actualBackoff := backoff.WithContext(backoff.WithMaxRetries(props.ManagedStreaming.Backoff, retryCount), ctx)
var err error = nil
err = backoff.Retry(func() error {
if !hasCustomId {
props.Streaming.ClientRequestId = fmt.Sprintf("KGC.executeManagedStreamingIngest;%s;%d", managedUuid, i)
}
result, err = streamImpl(m.streaming.streamConn, ctx, payloadProvider(), props, isBlobUri)
i++
if err != nil {
if e, ok := err.(*errors.Error); ok {
if errors.Retry(e) {
return err
} else {
return backoff.Permanent(err)
}
} else {
return backoff.Permanent(err)
}
}
return nil
}, actualBackoff)
if err == nil {
return result, nil
}
if errors.Retry(err) {
// Caller should fallback to queued
return nil, nil
}
return nil, err
}