internal/clients/data_plane_client.go (469 lines of code) (raw):

package clients import ( "context" "errors" "fmt" "net/http" "net/url" "regexp" "slices" "strings" "sync" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" armpolicy "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/policy" armruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/runtime" "github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/terraform-provider-azapi/internal/retry" "github.com/Azure/terraform-provider-azapi/internal/services/parse" "github.com/cenkalti/backoff/v4" "github.com/hashicorp/terraform-plugin-log/tflog" ) type DataPlaneClient struct { credential azcore.TokenCredential clientOptions *arm.ClientOptions cachedPipelines map[string]runtime.Pipeline syncMux sync.Mutex } type DataPlaneClientRetryableErrors struct { client DataPlaneRequester // client is a Requester interface to allow mocking backoff *backoff.ExponentialBackOff // backoff is the backoff configuration for retrying errors []regexp.Regexp // errors is the list of errors regexp to retry on statusCodes []int // statusCodes is the list of status codes to retry on dataCallbackFuncs []func(interface{}) bool // dataCallbackFuncs is the list of functions to call to determine if the data is retryable } type DataPlaneRequester interface { CreateOrUpdateThenPoll(ctx context.Context, id parse.DataPlaneResourceId, body interface{}, options RequestOptions) (interface{}, error) Get(ctx context.Context, id parse.DataPlaneResourceId, options RequestOptions) (interface{}, error) DeleteThenPoll(ctx context.Context, id parse.DataPlaneResourceId, options RequestOptions) (interface{}, error) Action(ctx context.Context, resourceID string, action string, apiVersion string, method string, body interface{}, options RequestOptions) (interface{}, error) } var ( _ DataPlaneRequester = &DataPlaneClient{} _ DataPlaneRequester = &DataPlaneClientRetryableErrors{} ) func (retryclient *DataPlaneClientRetryableErrors) updateContext(ctx context.Context) context.Context { ctx = tflog.SetField(ctx, "backoff_max_elapsed_time", retryclient.backoff.MaxElapsedTime.String()) ctx = tflog.SetField(ctx, "backoff_initial_interval", retryclient.backoff.InitialInterval.String()) ctx = tflog.SetField(ctx, "backoff_max_interval", retryclient.backoff.MaxInterval.String()) ctx = tflog.SetField(ctx, "backoff_multiplier", retryclient.backoff.Multiplier) ctx = tflog.SetField(ctx, "backoff_randomization_factor", retryclient.backoff.RandomizationFactor) ctx = tflog.SetField(ctx, "retryable_http_status_codes", retryclient.statusCodes) ctx = tflog.SetField(ctx, "retryable_data_callback_funcs_length", len(retryclient.dataCallbackFuncs)) re := make([]string, len(retryclient.errors)) for i, r := range retryclient.errors { re[i] = r.String() } ctx = tflog.SetField(ctx, "retryable_errors", re) return ctx } // NewDataPlaneClientRetryableErrors creates a new ResourceClientRetryableErrors. func NewDataPlaneClientRetryableErrors(client DataPlaneRequester, bkof *backoff.ExponentialBackOff, errRegExps []regexp.Regexp, statusCodes []int, dataCallbackFuncs []func(any) bool) *DataPlaneClientRetryableErrors { rcre := &DataPlaneClientRetryableErrors{ client: client, backoff: bkof, errors: errRegExps, statusCodes: statusCodes, dataCallbackFuncs: dataCallbackFuncs, } rcre.backoff.Reset() return rcre } func NewDataPlaneClient(credential azcore.TokenCredential, opt *arm.ClientOptions) (*DataPlaneClient, error) { if opt == nil { opt = &arm.ClientOptions{} } return &DataPlaneClient{ credential: credential, clientOptions: opt, cachedPipelines: make(map[string]runtime.Pipeline), syncMux: sync.Mutex{}, }, nil } // WithRetry configures the retryable errors for the client. func (client *DataPlaneClient) WithRetry(bkof *backoff.ExponentialBackOff, errRegExps []regexp.Regexp, statusCodes []int, dataCallbackFuncs []func(interface{}) bool) *DataPlaneClientRetryableErrors { rcre := &DataPlaneClientRetryableErrors{ client: client, backoff: bkof, errors: errRegExps, statusCodes: statusCodes, dataCallbackFuncs: dataCallbackFuncs, } rcre.backoff.Reset() return rcre } func (client *DataPlaneClient) cachedPipeline(rawUrl string) (runtime.Pipeline, error) { client.syncMux.Lock() defer client.syncMux.Unlock() parsedUrl, err := url.Parse(rawUrl) if err != nil { return runtime.Pipeline{}, err } serviceName := cloud.ResourceManager cloud := client.clientOptions.Cloud host := parsedUrl.Host for name, serviceConfiguration := range cloud.Services { if strings.HasSuffix(host, strings.TrimPrefix(serviceConfiguration.Endpoint, "https://")) { serviceName = name break } } if pipeline, ok := client.cachedPipelines[string(serviceName)]; ok { return pipeline, nil } plOpt := runtime.PipelineOptions{} plOpt.APIVersion.Name = "api-version" authPolicy := armruntime.NewBearerTokenPolicy(client.credential, &armpolicy.BearerTokenOptions{Scopes: []string{cloud.Services[serviceName].Audience + "/.default"}}) plOpt.PerRetry = append(plOpt.PerRetry, authPolicy) pl := runtime.NewPipeline(moduleName, moduleVersion, plOpt, &client.clientOptions.ClientOptions) client.cachedPipelines[string(serviceName)] = pl return pl, nil } func (client *DataPlaneClient) CreateOrUpdateThenPoll(ctx context.Context, id parse.DataPlaneResourceId, body interface{}, options RequestOptions) (interface{}, error) { // build request urlPath := fmt.Sprintf("https://%s", id.AzureResourceId) req, err := runtime.NewRequest(ctx, http.MethodPut, urlPath) if err != nil { return nil, err } reqQP := req.Raw().URL.Query() reqQP.Set("api-version", id.ApiVersion) for key, value := range options.QueryParameters { reqQP.Set(key, value) } req.Raw().URL.RawQuery = reqQP.Encode() req.Raw().Header.Set("Accept", "application/json") for key, value := range options.Headers { req.Raw().Header.Set(key, value) } err = runtime.MarshalAsJSON(req, body) if err != nil { return nil, err } // send request pipeline, err := client.cachedPipeline(urlPath) if err != nil { return nil, err } resp, err := pipeline.Do(req) if err != nil { return nil, err } if !runtime.HasStatusCode(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted) { return nil, runtime.NewResponseError(resp) } // poll until done pt, err := runtime.NewPoller[interface{}](resp, pipeline, nil) if err == nil { resp, err := pt.PollUntilDone(ctx, &runtime.PollUntilDoneOptions{ Frequency: 10 * time.Second, }) return resp, err } // unmarshal response var responseBody interface{} if err := runtime.UnmarshalAsJSON(resp, &responseBody); err != nil { return nil, err } return responseBody, nil } func (client *DataPlaneClient) Get(ctx context.Context, id parse.DataPlaneResourceId, options RequestOptions) (interface{}, error) { // build request urlPath := fmt.Sprintf("https://%s", id.AzureResourceId) req, err := runtime.NewRequest(ctx, http.MethodGet, urlPath) if err != nil { return nil, err } reqQP := req.Raw().URL.Query() reqQP.Set("api-version", id.ApiVersion) for key, value := range options.QueryParameters { reqQP.Set(key, value) } req.Raw().URL.RawQuery = reqQP.Encode() req.Raw().Header.Set("Accept", "application/json") for key, value := range options.Headers { req.Raw().Header.Set(key, value) } // send request pipeline, err := client.cachedPipeline(urlPath) if err != nil { return nil, err } resp, err := pipeline.Do(req) if err != nil { return nil, err } if !runtime.HasStatusCode(resp, http.StatusOK) { return nil, runtime.NewResponseError(resp) } // unmarshal response var responseBody interface{} if err := runtime.UnmarshalAsJSON(resp, &responseBody); err != nil { return nil, err } return responseBody, nil } func (client *DataPlaneClient) DeleteThenPoll(ctx context.Context, id parse.DataPlaneResourceId, options RequestOptions) (interface{}, error) { // build request urlPath := fmt.Sprintf("https://%s", id.AzureResourceId) req, err := runtime.NewRequest(ctx, http.MethodDelete, urlPath) if err != nil { return nil, err } reqQP := req.Raw().URL.Query() reqQP.Set("api-version", id.ApiVersion) for key, value := range options.QueryParameters { reqQP.Set(key, value) } req.Raw().URL.RawQuery = reqQP.Encode() req.Raw().Header.Set("Accept", "application/json") for key, value := range options.Headers { req.Raw().Header.Set(key, value) } // send request pipeline, err := client.cachedPipeline(urlPath) if err != nil { return nil, err } resp, err := pipeline.Do(req) if err != nil { return nil, err } if !runtime.HasStatusCode(resp, http.StatusOK, http.StatusAccepted, http.StatusNoContent) { return nil, runtime.NewResponseError(resp) } // poll until done pt, err := runtime.NewPoller[interface{}](resp, pipeline, nil) if err == nil { resp, err := pt.PollUntilDone(ctx, &runtime.PollUntilDoneOptions{ Frequency: 10 * time.Second, }) return resp, err } // unmarshal response var responseBody interface{} if err := runtime.UnmarshalAsJSON(resp, &responseBody); err != nil { return nil, err } return responseBody, nil } func (client *DataPlaneClient) Action(ctx context.Context, resourceID string, action string, apiVersion string, method string, body interface{}, options RequestOptions) (interface{}, error) { // build request urlPath := fmt.Sprintf("https://%s", resourceID) if action != "" { urlPath = fmt.Sprintf("%s/%s", resourceID, action) } req, err := runtime.NewRequest(ctx, method, urlPath) if err != nil { return nil, err } reqQP := req.Raw().URL.Query() reqQP.Set("api-version", apiVersion) for key, value := range options.QueryParameters { reqQP.Set(key, value) } req.Raw().URL.RawQuery = reqQP.Encode() req.Raw().Header.Set("Accept", "application/json") for key, value := range options.Headers { req.Raw().Header.Set(key, value) } if method != "GET" && body != nil { err = runtime.MarshalAsJSON(req, body) } if err != nil { return nil, err } // send request pipeline, err := client.cachedPipeline(urlPath) if err != nil { return nil, err } resp, err := pipeline.Do(req) if err != nil { return nil, err } if !runtime.HasStatusCode(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted) { return nil, runtime.NewResponseError(resp) } // poll until done pt, err := runtime.NewPoller[interface{}](resp, pipeline, nil) if err == nil { resp, err := pt.PollUntilDone(ctx, &runtime.PollUntilDoneOptions{ Frequency: 10 * time.Second, }) return resp, err } // unmarshal response var responseBody interface{} contentType := resp.Header.Get("Content-Type") switch { case strings.Contains(contentType, "text/plain"): payload, err := runtime.Payload(resp) if err != nil { return nil, err } responseBody = string(payload) case strings.Contains(contentType, "application/json"): if err := runtime.UnmarshalAsJSON(resp, &responseBody); err != nil { return nil, err } default: } return responseBody, nil } func (retryclient *DataPlaneClientRetryableErrors) CreateOrUpdateThenPoll(ctx context.Context, id parse.DataPlaneResourceId, body interface{}, options RequestOptions) (interface{}, error) { if retryclient.backoff == nil { return nil, errors.New("retry is not configured, please call WithRetry() first") } ctx = tflog.SetField(ctx, "request", "CreateOrUpdateThenPoll") ctx = retryclient.updateContext(ctx) tflog.Debug(ctx, "retryclient: Begin") i := 0 op := backoff.OperationWithData[interface{}]( func() (interface{}, error) { data, err := retryclient.client.CreateOrUpdateThenPoll(ctx, id, body, options) if err != nil { if isDataPlaneRetryable(ctx, *retryclient, data, err) { tflog.Debug(ctx, "retryclient: Retry attempt", map[string]interface{}{ "err": err, "attempt": i, }) i++ return data, err } tflog.Debug(ctx, "retryclient: PermanentError", map[string]interface{}{ "err": err, "attempt": i, }) tflog.Debug(ctx, "retryclient: Success", map[string]interface{}{ "attempt": i, }) return nil, &backoff.PermanentError{Err: err} } return data, err }) exbo := backoff.WithContext(retryclient.backoff, ctx) return backoff.RetryWithData(op, exbo) } func (retryclient *DataPlaneClientRetryableErrors) Get(ctx context.Context, id parse.DataPlaneResourceId, options RequestOptions) (interface{}, error) { if retryclient.backoff == nil { return nil, errors.New("retry is not configured, please call WithRetry() first") } ctx = tflog.SetField(ctx, "request", "Get") ctx = retryclient.updateContext(ctx) tflog.Debug(ctx, "retryclient: Begin") i := 0 op := backoff.OperationWithData[interface{}]( func() (interface{}, error) { data, err := retryclient.client.Get(ctx, id, options) if err != nil { if isDataPlaneRetryable(ctx, *retryclient, data, err) { tflog.Debug(ctx, "retryclient: Retry attempt", map[string]interface{}{ "err": err, "attempt": i, }) i++ return data, err } tflog.Debug(ctx, "retryclient: PermanentError", map[string]interface{}{ "err": err, "attempt": i, }) return nil, &backoff.PermanentError{Err: err} } tflog.Debug(ctx, "retryclient: Success", map[string]interface{}{ "attempt": i, }) return data, err }) exbo := backoff.WithContext(retryclient.backoff, ctx) return backoff.RetryWithData(op, exbo) } func (retryclient *DataPlaneClientRetryableErrors) DeleteThenPoll(ctx context.Context, id parse.DataPlaneResourceId, options RequestOptions) (interface{}, error) { if retryclient.backoff == nil { return nil, errors.New("retry is not configured, please call WithRetry() first") } ctx = tflog.SetField(ctx, "request", "DeleteThenPoll") ctx = retryclient.updateContext(ctx) tflog.Debug(ctx, "retryclient: Begin") i := 0 op := backoff.OperationWithData[interface{}]( func() (interface{}, error) { data, err := retryclient.client.DeleteThenPoll(ctx, id, options) if err != nil { if isDataPlaneRetryable(ctx, *retryclient, data, err) { tflog.Debug(ctx, "retryclient: Retry attempt", map[string]interface{}{ "err": err, "attempt": i, }) i++ return data, err } tflog.Debug(ctx, "retryclient: PermanentError", map[string]interface{}{ "err": err, "attempt": i, }) return nil, &backoff.PermanentError{Err: err} } tflog.Debug(ctx, "retryclient: Success", map[string]interface{}{ "attempt": i, }) return data, err }) exbo := backoff.WithContext(retryclient.backoff, ctx) return backoff.RetryWithData(op, exbo) } func (retryclient *DataPlaneClientRetryableErrors) Action(ctx context.Context, resourceID string, action string, apiVersion string, method string, body interface{}, options RequestOptions) (interface{}, error) { if retryclient.backoff == nil { return nil, errors.New("retry is not configured, please call WithRetry() first") } ctx = tflog.SetField(ctx, "request", "Action") ctx = retryclient.updateContext(ctx) tflog.Debug(ctx, "retryclient: Begin") i := 0 op := backoff.OperationWithData[interface{}]( func() (interface{}, error) { data, err := retryclient.client.Action(ctx, resourceID, action, apiVersion, method, body, options) if err != nil { if isDataPlaneRetryable(ctx, *retryclient, data, err) { tflog.Debug(ctx, "retryclient: Retry attempt", map[string]interface{}{ "err": err, "attempt": i, }) i++ return data, err } tflog.Debug(ctx, "retryclient: PermanentError", map[string]interface{}{ "err": err, "attempt": i, }) return nil, &backoff.PermanentError{Err: err} } tflog.Debug(ctx, "retryclient: Success", map[string]interface{}{ "attempt": i, }) return data, err }) exbo := backoff.WithContext(retryclient.backoff, ctx) return backoff.RetryWithData(op, exbo) } func isDataPlaneRetryable(ctx context.Context, retryclient DataPlaneClientRetryableErrors, data interface{}, err error) bool { for _, e := range retryclient.errors { if e.MatchString(err.Error()) { tflog.Debug(ctx, "isDataPlaneRetryable: Error is retryable by regex", map[string]interface{}{ "err": err, "regexp": e.String(), }) return true } } var respErr *azcore.ResponseError if errors.As(err, &respErr) { if slices.Contains(retryclient.statusCodes, respErr.StatusCode) { tflog.Debug(ctx, "isDataPlaneRetryable: Error is retryable by status code", map[string]interface{}{ "err": err, "statusCode": respErr.StatusCode, }) return true } } for i, f := range retryclient.dataCallbackFuncs { if f(data) { tflog.Debug(ctx, "isDataPlaneRetryable: Error is retryable by function callback", map[string]interface{}{ "err": err, "callback_func_idx": i, }) return true } } return false } // ConfigureClientWithCustomRetry configures the client with a custom retry configuration if supplied. // If the retry configuration is null or unknown, it will use the default retry configuration. // If the supplied context has a deadline, it will use the deadline as the max elapsed time when a custom retry is provided. func (client *DataPlaneClient) ConfigureClientWithCustomRetry(ctx context.Context, rtry retry.RetryValue, useReadAfterCreateValues bool) DataPlaneRequester { backOff, errRegExps, statusCodes := configureCustomRetry(ctx, rtry, useReadAfterCreateValues) return client.WithRetry(backOff, errRegExps, statusCodes, nil) }