internal/elasticsearch/logstash/pipeline.go (275 lines of code) (raw):
package logstash
import (
"context"
"encoding/json"
"fmt"
"math"
"regexp"
"time"
"github.com/elastic/terraform-provider-elasticstack/internal/clients"
"github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
"github.com/elastic/terraform-provider-elasticstack/internal/models"
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
)
var (
allSettingsKeys = map[string]schema.ValueType{
"pipeline.batch.delay": schema.TypeInt,
"pipeline.batch.size": schema.TypeInt,
"pipeline.ecs_compatibility": schema.TypeString,
"pipeline.ordered": schema.TypeString,
"pipeline.plugin_classloaders": schema.TypeBool,
"pipeline.unsafe_shutdown": schema.TypeBool,
"pipeline.workers": schema.TypeInt,
"queue.checkpoint.acks": schema.TypeInt,
"queue.checkpoint.retry": schema.TypeBool,
"queue.checkpoint.writes": schema.TypeInt,
"queue.drain": schema.TypeBool,
"queue.max_bytes": schema.TypeString,
"queue.max_events": schema.TypeInt,
"queue.page_capacity": schema.TypeString,
"queue.type": schema.TypeString,
}
)
func ResourceLogstashPipeline() *schema.Resource {
logstashPipelineSchema := map[string]*schema.Schema{
"id": {
Description: "Internal identifier of the resource",
Type: schema.TypeString,
Computed: true,
},
"pipeline_id": {
Description: "Identifier for the pipeline.",
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"description": {
Description: "Description of the pipeline.",
Type: schema.TypeString,
Optional: true,
},
"last_modified": {
Description: "Date the pipeline was last updated.",
Type: schema.TypeString,
Computed: true,
},
"pipeline": {
Description: "Configuration for the pipeline.",
Type: schema.TypeString,
Required: true,
},
"pipeline_metadata": {
Description: "Optional JSON metadata about the pipeline.",
Type: schema.TypeString,
ValidateFunc: validation.StringIsJSON,
DiffSuppressFunc: utils.DiffJsonSuppress,
Optional: true,
Default: "{\"type\":\"logstash_pipeline\",\"version\":1}",
},
// Pipeline Settings
"pipeline_batch_delay": {
Description: "Time in milliseconds to wait for each event before sending an undersized batch to pipeline workers.",
Type: schema.TypeInt,
Optional: true,
},
"pipeline_batch_size": {
Description: "The maximum number of events an individual worker thread collects before executing filters and outputs.",
Type: schema.TypeInt,
Optional: true,
},
"pipeline_ecs_compatibility": {
Description: "Sets the pipeline default value for ecs_compatibility, a setting that is available to plugins that implement an ECS compatibility mode for use with the Elastic Common Schema.",
Type: schema.TypeString,
ValidateFunc: validation.StringInSlice([]string{"disabled", "v1", "v8"}, false),
Optional: true,
},
"pipeline_ordered": {
Description: "Set the pipeline event ordering.",
Type: schema.TypeString,
ValidateFunc: validation.StringInSlice([]string{"auto", "true", "false"}, false),
Optional: true,
},
"pipeline_plugin_classloaders": {
Description: "(Beta) Load Java plugins in independent classloaders to isolate their dependencies.",
Type: schema.TypeBool,
Optional: true,
},
"pipeline_unsafe_shutdown": {
Description: "Forces Logstash to exit during shutdown even if there are still inflight events in memory.",
Type: schema.TypeBool,
Optional: true,
},
"pipeline_workers": {
Description: "The number of parallel workers used to run the filter and output stages of the pipeline.",
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validation.IntAtLeast(1),
},
"queue_checkpoint_acks": {
Description: "The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled.",
Type: schema.TypeInt,
Optional: true,
},
"queue_checkpoint_retry": {
Description: "When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried.",
Type: schema.TypeBool,
Optional: true,
},
"queue_checkpoint_writes": {
Description: "The maximum number of written events before forcing a checkpoint when persistent queues are enabled.",
Type: schema.TypeInt,
Optional: true,
},
"queue_drain": {
Description: "When enabled, Logstash waits until the persistent queue is drained before shutting down.",
Type: schema.TypeBool,
Optional: true,
},
"queue_max_bytes": {
Description: "Units for the total capacity of the queue when persistent queues are enabled.",
Type: schema.TypeString,
ValidateFunc: validation.StringMatch(regexp.MustCompile("^[0-9]+[kmgtp]?b$"), "must be valid size unit"),
Optional: true,
},
"queue_max_events": {
Description: "The maximum number of unread events in the queue when persistent queues are enabled.",
Type: schema.TypeInt,
Optional: true,
},
"queue_page_capacity": {
Description: "The size of the page data files used when persistent queues are enabled. The queue data consists of append-only data files separated into pages.",
Type: schema.TypeString,
Optional: true,
},
"queue_type": {
Description: "The internal queueing model for event buffering. Options are memory for in-memory queueing, or persisted for disk-based acknowledged queueing.",
Type: schema.TypeString,
ValidateFunc: validation.StringInSlice([]string{"memory", "persisted"}, false),
Optional: true,
},
// Pipeline Settings - End
"username": {
Description: "User who last updated the pipeline.",
Type: schema.TypeString,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("ELASTICSEARCH_USERNAME", "api_key"),
},
}
utils.AddConnectionSchema(logstashPipelineSchema)
return &schema.Resource{
Description: "Manage Logstash Pipelines via Centralized Pipeline Management. See, https://www.elastic.co/guide/en/elasticsearch/reference/current/logstash-apis.html",
CreateContext: resourceLogstashPipelinePut,
UpdateContext: resourceLogstashPipelinePut,
ReadContext: resourceLogstashPipelineRead,
DeleteContext: resourceLogstashPipelineDelete,
Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},
Schema: logstashPipelineSchema,
}
}
func resourceLogstashPipelinePut(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client, diags := clients.NewApiClientFromSDKResource(d, meta)
if diags.HasError() {
return diags
}
pipelineID := d.Get("pipeline_id").(string)
id, diags := client.ID(ctx, pipelineID)
if diags.HasError() {
return diags
}
var logstashPipeline models.LogstashPipeline
logstashPipeline.PipelineID = pipelineID
logstashPipeline.Description = d.Get("description").(string)
logstashPipeline.LastModified = utils.FormatStrictDateTime(time.Now().UTC())
logstashPipeline.Pipeline = d.Get("pipeline").(string)
var pipelineMetadata map[string]interface{}
if err := json.Unmarshal([]byte(d.Get("pipeline_metadata").(string)), &pipelineMetadata); err != nil {
return diag.FromErr(err)
}
logstashPipeline.PipelineMetadata = pipelineMetadata
logstashPipeline.PipelineSettings = map[string]interface{}{}
if settings := utils.ExpandIndividuallyDefinedSettings(ctx, d, allSettingsKeys); len(settings) > 0 {
logstashPipeline.PipelineSettings = settings
}
logstashPipeline.Username = d.Get("username").(string)
if diags := elasticsearch.PutLogstashPipeline(ctx, client, &logstashPipeline); diags.HasError() {
return diags
}
d.SetId(id.String())
return resourceLogstashPipelineRead(ctx, d, meta)
}
func resourceLogstashPipelineRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client, diags := clients.NewApiClientFromSDKResource(d, meta)
if diags.HasError() {
return diags
}
resourceID, diags := clients.ResourceIDFromStr(d.Id())
if diags.HasError() {
return diags
}
logstashPipeline, diags := elasticsearch.GetLogstashPipeline(ctx, client, resourceID)
if logstashPipeline == nil && diags == nil {
tflog.Warn(ctx, fmt.Sprintf(`Logstash pipeline "%s" not found, removing from state`, resourceID))
d.SetId("")
return diags
}
if diags.HasError() {
return diags
}
if err := d.Set("pipeline_id", logstashPipeline.PipelineID); err != nil {
return diag.FromErr(err)
}
if err := d.Set("description", logstashPipeline.Description); err != nil {
return diag.FromErr(err)
}
if err := d.Set("last_modified", logstashPipeline.LastModified); err != nil {
return diag.FromErr(err)
}
if err := d.Set("pipeline", logstashPipeline.Pipeline); err != nil {
return diag.FromErr(err)
}
pipelineMetadata, err := json.Marshal(logstashPipeline.PipelineMetadata)
if err != nil {
return diag.FromErr(err)
}
if err := d.Set("pipeline_metadata", string(pipelineMetadata)); err != nil {
return diag.FromErr(err)
}
for key, typ := range allSettingsKeys {
var value interface{}
if v, ok := logstashPipeline.PipelineSettings[key]; ok {
value = v
} else {
tflog.Warn(ctx, fmt.Sprintf("setting '%s' is not currently managed by terraform provider and has been ignored", key))
continue
}
switch typ {
case schema.TypeInt:
value = int(math.Round(value.(float64)))
}
if err := d.Set(utils.ConvertSettingsKeyToTFFieldKey(key), value); err != nil {
return diag.FromErr(err)
}
}
if err := d.Set("username", logstashPipeline.Username); err != nil {
return diag.FromErr(err)
}
return nil
}
func resourceLogstashPipelineDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client, diags := clients.NewApiClientFromSDKResource(d, meta)
if diags.HasError() {
return diags
}
resourceID, diags := clients.ResourceIDFromStr(d.Id())
if diags.HasError() {
return diags
}
if diags := elasticsearch.DeleteLogstashPipeline(ctx, client, resourceID); diags.HasError() {
return diags
}
return nil
}