internal/elasticsearch/ingest/pipeline.go (204 lines of code) (raw):

package ingest import ( "context" "encoding/json" "fmt" "strings" "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" "github.com/elastic/terraform-provider-elasticstack/internal/models" "github.com/elastic/terraform-provider-elasticstack/internal/utils" "github.com/hashicorp/terraform-plugin-log/tflog" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" ) func ResourceIngestPipeline() *schema.Resource { pipelineSchema := map[string]*schema.Schema{ "id": { Description: "Internal identifier of the resource", Type: schema.TypeString, Computed: true, }, "name": { Description: "The name of the ingest pipeline.", Type: schema.TypeString, Required: true, ForceNew: true, }, "description": { Description: "Description of the ingest pipeline.", Type: schema.TypeString, Optional: true, }, "on_failure": { Description: "Processors to run immediately after a processor failure. Each processor supports a processor-level `on_failure` value. If a processor without an `on_failure` value fails, Elasticsearch uses this pipeline-level parameter as a fallback. The processors in this parameter run sequentially in the order specified. Elasticsearch will not attempt to run the pipeline’s remaining processors. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html. Each record must be a valid JSON document", Type: schema.TypeList, Optional: true, MinItems: 1, Elem: &schema.Schema{ Type: schema.TypeString, ValidateFunc: validation.StringIsJSON, DiffSuppressFunc: utils.DiffJsonSuppress, }, }, "processors": { Description: "Processors used to perform transformations on documents before indexing. Processors run sequentially in the order specified. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html. Each record must be a valid JSON document.", Type: schema.TypeList, Required: true, MinItems: 1, Elem: &schema.Schema{ Type: schema.TypeString, ValidateFunc: validation.StringIsJSON, DiffSuppressFunc: utils.DiffJsonSuppress, }, }, "metadata": { Description: "Optional user metadata about the index template.", Type: schema.TypeString, Optional: true, ValidateFunc: validation.StringIsJSON, DiffSuppressFunc: utils.DiffJsonSuppress, }, } utils.AddConnectionSchema(pipelineSchema) return &schema.Resource{ Description: "Manages tasks and resources related to ingest pipelines and processors. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-apis.html", CreateContext: resourceIngestPipelineTemplatePut, UpdateContext: resourceIngestPipelineTemplatePut, ReadContext: resourceIngestPipelineTemplateRead, DeleteContext: resourceIngestPipelineTemplateDelete, Importer: &schema.ResourceImporter{ StateContext: schema.ImportStatePassthroughContext, }, Schema: pipelineSchema, } } func resourceIngestPipelineTemplatePut(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { client, diags := clients.NewApiClientFromSDKResource(d, meta) if diags.HasError() { return diags } pipelineId := d.Get("name").(string) id, diags := client.ID(ctx, pipelineId) if diags.HasError() { return diags } var pipeline models.IngestPipeline pipeline.Name = pipelineId if v, ok := d.GetOk("description"); ok { r := v.(string) pipeline.Description = &r } if v, ok := d.GetOk("on_failure"); ok { onFailure := make([]map[string]interface{}, len(v.([]interface{}))) for i, f := range v.([]interface{}) { item := make(map[string]interface{}) if err := json.NewDecoder(strings.NewReader(f.(string))).Decode(&item); err != nil { return diag.FromErr(err) } onFailure[i] = item } pipeline.OnFailure = onFailure } if v, ok := d.GetOk("processors"); ok { procs := make([]map[string]interface{}, len(v.([]interface{}))) for i, f := range v.([]interface{}) { item := make(map[string]interface{}) if err := json.NewDecoder(strings.NewReader(f.(string))).Decode(&item); err != nil { return diag.FromErr(err) } procs[i] = item } pipeline.Processors = procs } if v, ok := d.GetOk("metadata"); ok { metadata := make(map[string]interface{}) if err := json.NewDecoder(strings.NewReader(v.(string))).Decode(&metadata); err != nil { return diag.FromErr(err) } pipeline.Metadata = metadata } if diags := elasticsearch.PutIngestPipeline(ctx, client, &pipeline); diags.HasError() { return diags } d.SetId(id.String()) return resourceIngestPipelineTemplateRead(ctx, d, meta) } func resourceIngestPipelineTemplateRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { client, diags := clients.NewApiClientFromSDKResource(d, meta) if diags.HasError() { return diags } id := d.Id() compId, diags := clients.CompositeIdFromStr(id) if diags.HasError() { return diags } pipeline, diags := elasticsearch.GetIngestPipeline(ctx, client, &compId.ResourceId) if pipeline == nil && diags == nil { tflog.Warn(ctx, fmt.Sprintf(`Injest pipeline "%s" not found, removing from state`, compId.ResourceId)) d.SetId("") return diags } if diags.HasError() { return diags } if err := d.Set("name", pipeline.Name); err != nil { return diag.FromErr(err) } if desc := pipeline.Description; desc != nil { if err := d.Set("description", desc); err != nil { return diag.FromErr(err) } } if onFailure := pipeline.OnFailure; onFailure != nil { fProcs := make([]string, len(onFailure)) for i, v := range onFailure { res, err := json.Marshal(v) if err != nil { return diag.FromErr(err) } fProcs[i] = string(res) } if err := d.Set("on_failure", fProcs); err != nil { return diag.FromErr(err) } } procs := make([]string, len(pipeline.Processors)) for i, v := range pipeline.Processors { res, err := json.Marshal(v) if err != nil { return diag.FromErr(err) } procs[i] = string(res) } if err := d.Set("processors", procs); err != nil { return diag.FromErr(err) } if meta := pipeline.Metadata; meta != nil { meta, err := json.Marshal(meta) if err != nil { return diag.FromErr(err) } if err := d.Set("metadata", string(meta)); err != nil { return diag.FromErr(err) } } return diags } func resourceIngestPipelineTemplateDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { client, diags := clients.NewApiClientFromSDKResource(d, meta) if diags.HasError() { return diags } id := d.Id() compId, diags := clients.CompositeIdFromStr(id) if diags.HasError() { return diags } if diags := elasticsearch.DeleteIngestPipeline(ctx, client, &compId.ResourceId); diags.HasError() { return diags } return diags }