internal/clients/elasticsearch/enrich.go (165 lines of code) (raw):

package elasticsearch import ( "bytes" "context" "encoding/json" "fmt" "net/http" "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/models" "github.com/elastic/terraform-provider-elasticstack/internal/utils" "github.com/hashicorp/terraform-plugin-log/tflog" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" ) type enrichPolicyResponse struct { Name string `json:"name"` Indices []string `json:"indices"` MatchField string `json:"match_field"` EnrichFields []string `json:"enrich_fields"` Query map[string]any `json:"query,omitempty"` } type enrichPoliciesResponse struct { Policies []struct { Config map[string]enrichPolicyResponse `json:"config"` } `json:"policies"` } var policyTypes = []string{"range", "match", "geo_match"} func getPolicyType(m map[string]enrichPolicyResponse) (string, error) { for _, policyType := range policyTypes { if _, ok := m[policyType]; ok { return policyType, nil } } return "", fmt.Errorf("did not find expected policy type") } func GetEnrichPolicy(ctx context.Context, apiClient *clients.ApiClient, policyName string) (*models.EnrichPolicy, diag.Diagnostics) { var diags diag.Diagnostics esClient, err := apiClient.GetESClient() if err != nil { return nil, diag.FromErr(err) } req := esClient.EnrichGetPolicy.WithName(policyName) res, err := esClient.EnrichGetPolicy(req, esClient.EnrichGetPolicy.WithContext(ctx)) if err != nil { return nil, diag.FromErr(err) } defer res.Body.Close() if res.StatusCode == http.StatusNotFound { return nil, nil } if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested EnrichPolicy: %s", policyName)); diags.HasError() { return nil, diags } var policies enrichPoliciesResponse if err := json.NewDecoder(res.Body).Decode(&policies); err != nil { return nil, diag.FromErr(err) } if len(policies.Policies) == 0 { return nil, diags } if len(policies.Policies) > 1 { tflog.Warn(ctx, fmt.Sprintf(`Somehow found more than one policy for policy named %s`, policyName)) } config := policies.Policies[0].Config policyType, err := getPolicyType(config) if err != nil { return nil, diag.FromErr(err) } policy := config[policyType] queryJSON, err := json.Marshal(policy.Query) if err != nil { return nil, diag.FromErr(err) } return &models.EnrichPolicy{ Type: policyType, Name: policy.Name, Indices: policy.Indices, MatchField: policy.MatchField, EnrichFields: policy.EnrichFields, Query: string(queryJSON), }, diags } func tryJSONUnmarshalString(s string) (any, bool) { var data any if err := json.Unmarshal([]byte(s), &data); err != nil { return s, false } return data, true } func PutEnrichPolicy(ctx context.Context, apiClient *clients.ApiClient, policy *models.EnrichPolicy) diag.Diagnostics { var diags diag.Diagnostics payloadPolicy := map[string]any{ "indices": policy.Indices, "enrich_fields": policy.EnrichFields, "match_field": policy.MatchField, } if query, ok := tryJSONUnmarshalString(policy.Query); ok { payloadPolicy["query"] = query } else if policy.Query != "" { tflog.Error(ctx, fmt.Sprintf("JAW: query did not unmarshall %s", policy.Query)) } payload := map[string]any{} payload[policy.Type] = payloadPolicy policyBytes, err := json.Marshal(payload) if err != nil { return diag.FromErr(err) } esClient, err := apiClient.GetESClient() if err != nil { return diag.FromErr(err) } res, err := esClient.EnrichPutPolicy(policy.Name, bytes.NewReader(policyBytes), esClient.EnrichPutPolicy.WithContext(ctx)) if err != nil { return diag.FromErr(err) } defer res.Body.Close() if diags := utils.CheckError(res, "Unable to create enrich policy"); diags.HasError() { return diags } return diags } func DeleteEnrichPolicy(ctx context.Context, apiClient *clients.ApiClient, policyName string) diag.Diagnostics { var diags diag.Diagnostics esClient, err := apiClient.GetESClient() if err != nil { return diag.FromErr(err) } res, err := esClient.EnrichDeletePolicy(policyName, esClient.EnrichDeletePolicy.WithContext(ctx)) if err != nil { return diag.FromErr(err) } defer res.Body.Close() if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete enrich policy: %s", policyName)); diags.HasError() { return diags } return diags } func ExecuteEnrichPolicy(ctx context.Context, apiClient *clients.ApiClient, policyName string) diag.Diagnostics { var diags diag.Diagnostics esClient, err := apiClient.GetESClient() if err != nil { return diag.FromErr(err) } res, err := esClient.EnrichExecutePolicy( policyName, esClient.EnrichExecutePolicy.WithContext(ctx), esClient.EnrichExecutePolicy.WithWaitForCompletion(true), ) if err != nil { return diag.FromErr(err) } defer res.Body.Close() if res.StatusCode != http.StatusOK { return diag.Errorf(`Executing policy "%s" failed with http status %d`, policyName, res.StatusCode) } var response struct { Status struct { Phase string `json:"phase"` } `json:"status"` } if err := json.NewDecoder(res.Body).Decode(&response); err != nil { return diag.FromErr(err) } if response.Status.Phase != "COMPLETE" { return diag.Errorf(`Unexpected response to executing enrich policy: %s`, response.Status.Phase) } return diags }