internal/clients/elasticsearch/transform.go (256 lines of code) (raw):

package elasticsearch import ( "bytes" "context" "encoding/json" "fmt" "net/http" "time" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/models" "github.com/elastic/terraform-provider-elasticstack/internal/utils" "github.com/hashicorp/go-version" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" ) var transformFeatureMinSupportedVersion = version.Must(version.NewVersion("7.2.0")) var apiOperationTimeoutParamMinSupportedVersion = version.Must(version.NewVersion("7.17.0")) func PutTransform(ctx context.Context, apiClient *clients.ApiClient, transform *models.Transform, params *models.PutTransformParams) diag.Diagnostics { var diags diag.Diagnostics transformBytes, err := json.Marshal(transform) if err != nil { return diag.FromErr(err) } esClient, err := apiClient.GetESClient() if err != nil { return diag.FromErr(err) } serverVersion, diags := apiClient.ServerVersion(ctx) if diags.HasError() { return diags } if serverVersion.LessThan(transformFeatureMinSupportedVersion) { diags = append(diags, diag.Diagnostic{ Severity: diag.Error, Summary: "Transforms not supported", Detail: fmt.Sprintf(`Transform feature requires a minimum Elasticsearch version of "%s"`, transformFeatureMinSupportedVersion), }) return diags } withTimeout := serverVersion.GreaterThanOrEqual(apiOperationTimeoutParamMinSupportedVersion) putOptions := []func(*esapi.TransformPutTransformRequest){ esClient.TransformPutTransform.WithContext(ctx), esClient.TransformPutTransform.WithDeferValidation(params.DeferValidation), } if withTimeout { putOptions = append(putOptions, esClient.TransformPutTransform.WithTimeout(params.Timeout)) } res, err := esClient.TransformPutTransform(bytes.NewReader(transformBytes), transform.Name, putOptions...) if err != nil { return diag.FromErr(err) } defer res.Body.Close() if diags := utils.CheckError(res, fmt.Sprintf("Unable to create transform: %s", transform.Name)); diags.HasError() { return diags } if params.Enabled { var timeout time.Duration if withTimeout { timeout = params.Timeout } else { timeout = 0 } if diags := startTransform(ctx, esClient, transform.Name, timeout); diags.HasError() { return diags } } return diags } func GetTransform(ctx context.Context, apiClient *clients.ApiClient, name *string) (*models.Transform, diag.Diagnostics) { var diags diag.Diagnostics esClient, err := apiClient.GetESClient() if err != nil { return nil, diag.FromErr(err) } req := esClient.TransformGetTransform.WithTransformID(*name) res, err := esClient.TransformGetTransform(req, esClient.TransformGetTransform.WithContext(ctx)) if err != nil { return nil, diag.FromErr(err) } defer res.Body.Close() if res.StatusCode == http.StatusNotFound { return nil, nil } if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested transform: %s", *name)); diags.HasError() { return nil, diags } var transformsResponse models.GetTransformResponse if err := json.NewDecoder(res.Body).Decode(&transformsResponse); err != nil { return nil, diag.FromErr(err) } var foundTransform *models.Transform = nil for _, t := range transformsResponse.Transforms { if t.Id == *name { foundTransform = &t break } } if foundTransform == nil { diags = append(diags, diag.Diagnostic{ Severity: diag.Error, Summary: "Unable to find the transform in the cluster", Detail: fmt.Sprintf(`Unable to find "%s" transform in the cluster`, *name), }) return nil, diags } foundTransform.Name = *name return foundTransform, diags } func GetTransformStats(ctx context.Context, apiClient *clients.ApiClient, name *string) (*models.TransformStats, diag.Diagnostics) { var diags diag.Diagnostics esClient, err := apiClient.GetESClient() if err != nil { return nil, diag.FromErr(err) } getStatsOptions := []func(*esapi.TransformGetTransformStatsRequest){ esClient.TransformGetTransformStats.WithContext(ctx), } statsRes, err := esClient.TransformGetTransformStats(*name, getStatsOptions...) if err != nil { return nil, diag.FromErr(err) } defer statsRes.Body.Close() if diags := utils.CheckError(statsRes, fmt.Sprintf("Unable to get transform stats: %s", *name)); diags.HasError() { return nil, diags } var transformsStatsResponse models.GetTransformStatsResponse if err := json.NewDecoder(statsRes.Body).Decode(&transformsStatsResponse); err != nil { return nil, diag.FromErr(err) } var foundTransformStats *models.TransformStats = nil for _, ts := range transformsStatsResponse.TransformStats { if ts.Id == *name { foundTransformStats = &ts break } } if foundTransformStats == nil { diags = append(diags, diag.Diagnostic{ Severity: diag.Error, Summary: "Unable to find the transform stats in the cluster", Detail: fmt.Sprintf(`Unable to find "%s" transform stats in the cluster`, *name), }) return nil, diags } return foundTransformStats, diags } func UpdateTransform(ctx context.Context, apiClient *clients.ApiClient, transform *models.Transform, params *models.UpdateTransformParams) diag.Diagnostics { var diags diag.Diagnostics transformBytes, err := json.Marshal(transform) if err != nil { return diag.FromErr(err) } esClient, err := apiClient.GetESClient() if err != nil { return diag.FromErr(err) } serverVersion, diags := apiClient.ServerVersion(ctx) if diags.HasError() { return diags } if serverVersion.LessThan(transformFeatureMinSupportedVersion) { diags = append(diags, diag.Diagnostic{ Severity: diag.Error, Summary: "Transforms not supported", Detail: fmt.Sprintf(`Transform feature requires a minimum Elasticsearch version of "%s"`, transformFeatureMinSupportedVersion), }) return diags } withTimeout := serverVersion.GreaterThanOrEqual(apiOperationTimeoutParamMinSupportedVersion) updateOptions := []func(*esapi.TransformUpdateTransformRequest){ esClient.TransformUpdateTransform.WithContext(ctx), esClient.TransformUpdateTransform.WithDeferValidation(params.DeferValidation), } if withTimeout { updateOptions = append(updateOptions, esClient.TransformUpdateTransform.WithTimeout(params.Timeout)) } res, err := esClient.TransformUpdateTransform(bytes.NewReader(transformBytes), transform.Name, updateOptions...) if err != nil { return diag.FromErr(err) } defer res.Body.Close() if diags := utils.CheckError(res, fmt.Sprintf("Unable to update transform: %s", transform.Name)); diags.HasError() { return diags } var timeout time.Duration if withTimeout { timeout = params.Timeout } else { timeout = 0 } if params.ApplyEnabled { if params.Enabled { if diags := startTransform(ctx, esClient, transform.Name, timeout); diags.HasError() { return diags } } else { if diags := stopTransform(ctx, esClient, transform.Name, timeout); diags.HasError() { return diags } } } return diags } func DeleteTransform(ctx context.Context, apiClient *clients.ApiClient, name *string) diag.Diagnostics { var diags diag.Diagnostics esClient, err := apiClient.GetESClient() if err != nil { return diag.FromErr(err) } res, err := esClient.TransformDeleteTransform(*name, esClient.TransformDeleteTransform.WithForce(true), esClient.TransformDeleteTransform.WithContext(ctx)) if err != nil { return diag.FromErr(err) } defer res.Body.Close() if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete transform: %s", *name)); diags.HasError() { return diags } return diags } func startTransform(ctx context.Context, esClient *elasticsearch.Client, transformName string, timeout time.Duration) diag.Diagnostics { var diags diag.Diagnostics startOptions := []func(*esapi.TransformStartTransformRequest){ esClient.TransformStartTransform.WithContext(ctx), } if timeout > 0 { startOptions = append(startOptions, esClient.TransformStartTransform.WithTimeout(timeout)) } startRes, err := esClient.TransformStartTransform(transformName, startOptions...) if err != nil { return diag.FromErr(err) } defer startRes.Body.Close() if diags := utils.CheckError(startRes, fmt.Sprintf("Unable to start transform: %s", transformName)); diags.HasError() { return diags } return diags } func stopTransform(ctx context.Context, esClient *elasticsearch.Client, transformName string, timeout time.Duration) diag.Diagnostics { var diags diag.Diagnostics stopOptions := []func(*esapi.TransformStopTransformRequest){ esClient.TransformStopTransform.WithContext(ctx), } if timeout > 0 { stopOptions = append(stopOptions, esClient.TransformStopTransform.WithTimeout(timeout)) } startRes, err := esClient.TransformStopTransform(transformName, stopOptions...) if err != nil { return diag.FromErr(err) } defer startRes.Body.Close() if diags := utils.CheckError(startRes, fmt.Sprintf("Unable to stop transform: %s", transformName)); diags.HasError() { return diags } return diags }