internal/elasticsearch/index/ilm.go (711 lines of code) (raw):

package index import ( "context" "encoding/json" "fmt" "strings" "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch" "github.com/elastic/terraform-provider-elasticstack/internal/models" "github.com/elastic/terraform-provider-elasticstack/internal/utils" "github.com/hashicorp/go-version" "github.com/hashicorp/terraform-plugin-log/tflog" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" ) var supportedIlmPhases = [...]string{"hot", "warm", "cold", "frozen", "delete"} func ResourceIlm() *schema.Resource { ilmSchema := map[string]*schema.Schema{ "id": { Description: "Internal identifier of the resource", Type: schema.TypeString, Computed: true, }, "name": { Description: "Identifier for the policy.", Type: schema.TypeString, Required: true, ForceNew: true, }, "metadata": { Description: "Optional user metadata about the ilm policy. Must be valid JSON document.", Type: schema.TypeString, Optional: true, ValidateFunc: validation.StringIsJSON, DiffSuppressFunc: utils.DiffJsonSuppress, }, "hot": { Description: "The index is actively being updated and queried.", Type: schema.TypeList, Optional: true, MaxItems: 1, AtLeastOneOf: []string{"hot", "warm", "cold", "frozen", "delete"}, Elem: &schema.Resource{ Schema: getSchema("set_priority", "unfollow", "rollover", "readonly", "shrink", "forcemerge", "searchable_snapshot", "downsample"), }, }, "warm": { Description: "The index is no longer being updated but is still being queried.", Type: schema.TypeList, Optional: true, MaxItems: 1, AtLeastOneOf: []string{"hot", "warm", "cold", "frozen", "delete"}, Elem: &schema.Resource{ Schema: getSchema("set_priority", "unfollow", "readonly", "allocate", "migrate", "shrink", "forcemerge", "downsample"), }, }, "cold": { Description: "The index is no longer being updated and is queried infrequently. The information still needs to be searchable, but it’s okay if those queries are slower.", Type: schema.TypeList, Optional: true, MaxItems: 1, AtLeastOneOf: []string{"hot", "warm", "cold", "frozen", "delete"}, Elem: &schema.Resource{ Schema: getSchema("set_priority", "unfollow", "readonly", "searchable_snapshot", "allocate", "migrate", "freeze", "downsample"), }, }, "frozen": { Description: "The index is no longer being updated and is queried rarely. The information still needs to be searchable, but it’s okay if those queries are extremely slow.", Type: schema.TypeList, Optional: true, MaxItems: 1, AtLeastOneOf: []string{"hot", "warm", "cold", "frozen", "delete"}, Elem: &schema.Resource{ Schema: getSchema("searchable_snapshot"), }, }, "delete": { Description: "The index is no longer needed and can safely be removed.", Type: schema.TypeList, Optional: true, MaxItems: 1, AtLeastOneOf: []string{"hot", "warm", "cold", "frozen", "delete"}, Elem: &schema.Resource{ Schema: getSchema("wait_for_snapshot", "delete"), }, }, "modified_date": { Description: "The DateTime of the last modification.", Type: schema.TypeString, Computed: true, }, } utils.AddConnectionSchema(ilmSchema) return &schema.Resource{ Description: "Creates or updates lifecycle policy. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/ilm-put-lifecycle.html and https://www.elastic.co/guide/en/elasticsearch/reference/current/ilm-index-lifecycle.html", CreateContext: resourceIlmPut, UpdateContext: resourceIlmPut, ReadContext: resourceIlmRead, DeleteContext: resourceIlmDelete, Importer: &schema.ResourceImporter{ StateContext: schema.ImportStatePassthroughContext, }, Schema: ilmSchema, } } var supportedActions = map[string]*schema.Schema{ "allocate": { Description: "Updates the index settings to change which nodes are allowed to host the index shards and change the number of replicas.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "number_of_replicas": { Description: "Number of replicas to assign to the index. Default: `0`", Type: schema.TypeInt, Optional: true, Default: 0, }, "total_shards_per_node": { Description: "The maximum number of shards for the index on a single Elasticsearch node. Defaults to `-1` (unlimited). Supported from Elasticsearch version **7.16**", Type: schema.TypeInt, Optional: true, Default: -1, }, "include": { Description: "Assigns an index to nodes that have at least one of the specified custom attributes. Must be valid JSON document.", Type: schema.TypeString, Optional: true, ValidateFunc: validation.StringIsJSON, DiffSuppressFunc: utils.DiffJsonSuppress, Default: "{}", }, "exclude": { Description: "Assigns an index to nodes that have none of the specified custom attributes. Must be valid JSON document.", Type: schema.TypeString, Optional: true, ValidateFunc: validation.StringIsJSON, DiffSuppressFunc: utils.DiffJsonSuppress, Default: "{}", }, "require": { Description: "Assigns an index to nodes that have all of the specified custom attributes. Must be valid JSON document.", Type: schema.TypeString, Optional: true, ValidateFunc: validation.StringIsJSON, DiffSuppressFunc: utils.DiffJsonSuppress, Default: "{}", }, }, }, }, "delete": { Description: "Permanently removes the index.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "delete_searchable_snapshot": { Description: "Deletes the searchable snapshot created in a previous phase.", Type: schema.TypeBool, Optional: true, Default: true, }, }, }, }, "forcemerge": { Description: "Force merges the index into the specified maximum number of segments. This action makes the index read-only.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "max_num_segments": { Description: "Number of segments to merge to. To fully merge the index, set to 1.", Type: schema.TypeInt, Required: true, ValidateFunc: validation.IntAtLeast(1), }, "index_codec": { Description: "Codec used to compress the document store.", Type: schema.TypeString, Optional: true, }, }, }, }, "freeze": { Description: "Freeze the index to minimize its memory footprint.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "enabled": { Description: "Controls whether ILM freezes the index.", Type: schema.TypeBool, Optional: true, Default: true, }, }, }, }, "migrate": { Description: `Moves the index to the data tier that corresponds to the current phase by updating the "index.routing.allocation.include._tier_preference" index setting.`, Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "enabled": { Description: "Controls whether ILM automatically migrates the index during this phase.", Type: schema.TypeBool, Optional: true, Default: true, }, }, }, }, "readonly": { Description: "Makes the index read-only.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "enabled": { Description: "Controls whether ILM makes the index read-only.", Type: schema.TypeBool, Optional: true, Default: true, }, }, }, }, "rollover": { Description: "Rolls over a target to a new index when the existing index meets one or more of the rollover conditions.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "max_age": { Description: "Triggers rollover after the maximum elapsed time from index creation is reached.", Type: schema.TypeString, Optional: true, }, "max_docs": { Description: "Triggers rollover after the specified maximum number of documents is reached.", Type: schema.TypeInt, Optional: true, }, "max_size": { Description: "Triggers rollover when the index reaches a certain size.", Type: schema.TypeString, Optional: true, }, "max_primary_shard_docs": { Description: "Triggers rollover when the largest primary shard in the index reaches a certain number of documents. Supported from Elasticsearch version **8.2**", Type: schema.TypeInt, Optional: true, }, "max_primary_shard_size": { Description: "Triggers rollover when the largest primary shard in the index reaches a certain size.", Type: schema.TypeString, Optional: true, }, "min_age": { Description: "Prevents rollover until after the minimum elapsed time from index creation is reached. Supported from Elasticsearch version **8.4**", Type: schema.TypeString, Optional: true, }, "min_docs": { Description: "Prevents rollover until after the specified minimum number of documents is reached. Supported from Elasticsearch version **8.4**", Type: schema.TypeInt, Optional: true, }, "min_size": { Description: "Prevents rollover until the index reaches a certain size.", Type: schema.TypeString, Optional: true, }, "min_primary_shard_docs": { Description: "Prevents rollover until the largest primary shard in the index reaches a certain number of documents. Supported from Elasticsearch version **8.4**", Type: schema.TypeInt, Optional: true, }, "min_primary_shard_size": { Description: "Prevents rollover until the largest primary shard in the index reaches a certain size. Supported from Elasticsearch version **8.4**", Type: schema.TypeString, Optional: true, }, }, }, }, "searchable_snapshot": { Description: "Takes a snapshot of the managed index in the configured repository and mounts it as a searchable snapshot.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "snapshot_repository": { Description: "Repository used to store the snapshot.", Type: schema.TypeString, Required: true, }, "force_merge_index": { Description: "Force merges the managed index to one segment.", Type: schema.TypeBool, Optional: true, Default: true, }, }, }, }, "set_priority": { Description: "Sets the priority of the index as soon as the policy enters the hot, warm, or cold phase. Higher priority indices are recovered before indices with lower priorities following a node restart. Default priority is 1.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "priority": { Description: "The priority for the index. Must be 0 or greater.", Type: schema.TypeInt, Required: true, ValidateFunc: validation.IntAtLeast(0), }, }, }, }, "shrink": { Description: "Sets a source index to read-only and shrinks it into a new index with fewer primary shards.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "number_of_shards": { Description: "Number of shards to shrink to.", Type: schema.TypeInt, Optional: true, }, "max_primary_shard_size": { Description: "The max primary shard size for the target index.", Type: schema.TypeString, Optional: true, }, "allow_write_after_shrink": { Description: "If true, the shrunken index is made writable by removing the write block.", Type: schema.TypeBool, Optional: true, }, }, }, }, "unfollow": { Description: "Convert a follower index to a regular index. Performed automatically before a rollover, shrink, or searchable snapshot action.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "enabled": { Description: "Controls whether ILM makes the follower index a regular one.", Type: schema.TypeBool, Optional: true, Default: true, }, }, }, }, "wait_for_snapshot": { Description: "Waits for the specified SLM policy to be executed before removing the index. This ensures that a snapshot of the deleted index is available.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "policy": { Description: "Name of the SLM policy that the delete action should wait for.", Type: schema.TypeString, Required: true, }, }, }, }, "downsample": { Description: "Roll up documents within a fixed interval to a single summary document. Reduces the index footprint by storing time series data at reduced granularity.", Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "fixed_interval": { Description: "Downsampling interval", Type: schema.TypeString, Required: true, }, "wait_timeout": { Description: "Downsampling interval", Type: schema.TypeString, Optional: true, Computed: true, }, }, }, }, } func getSchema(actions ...string) map[string]*schema.Schema { sch := make(map[string]*schema.Schema) for _, a := range actions { if action, ok := supportedActions[a]; ok { sch[a] = action } } // min age can be set for all the phases sch["min_age"] = &schema.Schema{ Description: "ILM moves indices through the lifecycle according to their age. To control the timing of these transitions, you set a minimum age for each phase.", Type: schema.TypeString, Optional: true, Computed: true, } return sch } func resourceIlmPut(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { client, diags := clients.NewApiClientFromSDKResource(d, meta) if diags.HasError() { return diags } ilmId := d.Get("name").(string) id, diags := client.ID(ctx, ilmId) if diags.HasError() { return diags } serverVersion, diags := client.ServerVersion(ctx) if diags.HasError() { return diags } policy, diags := expandIlmPolicy(d, serverVersion) if diags.HasError() { return diags } policy.Name = ilmId if diags := elasticsearch.PutIlm(ctx, client, policy); diags.HasError() { return diags } d.SetId(id.String()) return resourceIlmRead(ctx, d, meta) } func expandIlmPolicy(d *schema.ResourceData, serverVersion *version.Version) (*models.Policy, diag.Diagnostics) { var diags diag.Diagnostics var policy models.Policy phases := make(map[string]models.Phase) policy.Name = d.Get("name").(string) if v, ok := d.GetOk("metadata"); ok { metadata := make(map[string]interface{}) if err := json.NewDecoder(strings.NewReader(v.(string))).Decode(&metadata); err != nil { return nil, diag.FromErr(err) } policy.Metadata = metadata } for _, ph := range supportedIlmPhases { if v, ok := d.GetOk(ph); ok { phase, diags := expandPhase(v.([]interface{})[0].(map[string]interface{}), serverVersion) if diags.HasError() { return nil, diags } phases[ph] = *phase } } policy.Phases = phases return &policy, diags } func expandPhase(p map[string]interface{}, serverVersion *version.Version) (*models.Phase, diag.Diagnostics) { var diags diag.Diagnostics var phase models.Phase if v := p["min_age"].(string); v != "" { phase.MinAge = v } delete(p, "min_age") actions := make(map[string]models.Action) for actionName, action := range p { if a := action.([]interface{}); len(a) > 0 { switch actionName { case "allocate": actions[actionName], diags = expandAction(a, serverVersion, "number_of_replicas", "total_shards_per_node", "include", "exclude", "require") case "delete": actions[actionName], diags = expandAction(a, serverVersion, "delete_searchable_snapshot") case "forcemerge": actions[actionName], diags = expandAction(a, serverVersion, "max_num_segments", "index_codec") case "freeze": if a[0] != nil { ac := a[0].(map[string]interface{}) if ac["enabled"].(bool) { actions[actionName], diags = expandAction(a, serverVersion) } } case "migrate": actions[actionName], diags = expandAction(a, serverVersion, "enabled") case "readonly": if a[0] != nil { ac := a[0].(map[string]interface{}) if ac["enabled"].(bool) { actions[actionName], diags = expandAction(a, serverVersion) } } case "rollover": actions[actionName], diags = expandAction(a, serverVersion, "max_age", "max_docs", "max_size", "max_primary_shard_docs", "max_primary_shard_size", "min_age", "min_docs", "min_size", "min_primary_shard_docs", "min_primary_shard_size") case "searchable_snapshot": actions[actionName], diags = expandAction(a, serverVersion, "snapshot_repository", "force_merge_index") case "set_priority": actions[actionName], diags = expandAction(a, serverVersion, "priority") case "shrink": actions[actionName], diags = expandAction(a, serverVersion, "number_of_shards", "max_primary_shard_size", "allow_write_after_shrink") case "unfollow": if a[0] != nil { ac := a[0].(map[string]interface{}) if ac["enabled"].(bool) { actions[actionName], diags = expandAction(a, serverVersion) } } case "wait_for_snapshot": actions[actionName], diags = expandAction(a, serverVersion, "policy") case "downsample": actions[actionName], diags = expandAction(a, serverVersion, "fixed_interval", "wait_timeout") default: diags = append(diags, diag.Diagnostic{ Severity: diag.Error, Summary: "Unknown action defined.", Detail: fmt.Sprintf(`Configured action "%s" is not supported`, actionName), }) return nil, diags } } } phase.Actions = actions return &phase, diags } var ( RolloverMinConditionsMinSupportedVersion = version.Must(version.NewVersion("8.4.0")) MaxPrimaryShardDocsMinSupportedVersion = version.Must(version.NewVersion("8.2.0")) ) var ilmActionSettingOptions = map[string]struct { skipEmptyCheck bool def interface{} minVersion *version.Version }{ "allow_write_after_shrink": {def: false, minVersion: version.Must(version.NewVersion("8.14.0"))}, "number_of_replicas": {skipEmptyCheck: true}, "priority": {skipEmptyCheck: true}, "max_primary_shard_docs": {def: 0, minVersion: MaxPrimaryShardDocsMinSupportedVersion}, "min_age": {def: "", minVersion: RolloverMinConditionsMinSupportedVersion}, "min_docs": {def: 0, minVersion: RolloverMinConditionsMinSupportedVersion}, "min_size": {def: "", minVersion: RolloverMinConditionsMinSupportedVersion}, "min_primary_shard_docs": {def: 0, minVersion: RolloverMinConditionsMinSupportedVersion}, "min_primary_shard_size": {def: "", minVersion: RolloverMinConditionsMinSupportedVersion}, "total_shards_per_node": {skipEmptyCheck: true, def: -1, minVersion: version.Must(version.NewVersion("7.16.0"))}, } func expandAction(a []interface{}, serverVersion *version.Version, settings ...string) (map[string]interface{}, diag.Diagnostics) { var diags diag.Diagnostics def := make(map[string]interface{}) if action := a[0]; action != nil { for _, setting := range settings { if v, ok := action.(map[string]interface{})[setting]; ok && v != nil { options := ilmActionSettingOptions[setting] if options.minVersion != nil && options.minVersion.GreaterThan(serverVersion) { if v != options.def { return nil, diag.Errorf("[%s] is not supported in the target Elasticsearch server. Remove the setting from your module definition or set it to the default [%s] value", setting, options.def) } // This setting is not supported, and shouldn't be set in the ILM policy object continue } if options.skipEmptyCheck || !utils.IsEmpty(v) { // these 3 fields must be treated as JSON objects if setting == "include" || setting == "exclude" || setting == "require" { res := make(map[string]interface{}) if err := json.Unmarshal([]byte(v.(string)), &res); err != nil { return nil, diag.FromErr(err) } def[setting] = res } else { def[setting] = v } } } } } return def, diags } func resourceIlmRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { client, diags := clients.NewApiClientFromSDKResource(d, meta) if diags.HasError() { return diags } id := d.Id() compId, diags := clients.CompositeIdFromStr(id) if diags.HasError() { return diags } policyId := compId.ResourceId ilmDef, diags := elasticsearch.GetIlm(ctx, client, policyId) if ilmDef == nil && diags == nil { tflog.Warn(ctx, fmt.Sprintf(`ILM policy "%s" not found, removing from state`, compId.ResourceId)) d.SetId("") return diags } if diags.HasError() { return diags } if err := d.Set("modified_date", ilmDef.Modified); err != nil { return diag.FromErr(err) } if ilmDef.Policy.Metadata != nil { metadata, err := json.Marshal(ilmDef.Policy.Metadata) if err != nil { return diag.FromErr(err) } if err := d.Set("metadata", string(metadata)); err != nil { return diag.FromErr(err) } } if err := d.Set("name", policyId); err != nil { return diag.FromErr(err) } for _, ph := range supportedIlmPhases { if v, ok := ilmDef.Policy.Phases[ph]; ok { phase, diags := flattenPhase(ph, v, d) if diags.HasError() { return diags } if err := d.Set(ph, phase); err != nil { return diag.FromErr(err) } } } return diags } func flattenPhase(phaseName string, p models.Phase, d *schema.ResourceData) (interface{}, diag.Diagnostics) { var diags diag.Diagnostics out := make([]interface{}, 1) phase := make(map[string]interface{}) enabled := make(map[string]interface{}) ns := make(map[string]interface{}) _, new := d.GetChange(phaseName) if new != nil && len(new.([]interface{})) > 0 { ns = new.([]interface{})[0].(map[string]interface{}) } existsAndNotEmpty := func(key string, m map[string]interface{}) bool { if v, ok := m[key]; ok && len(v.([]interface{})) > 0 { return true } return false } for _, aCase := range []string{"readonly", "freeze", "unfollow"} { if existsAndNotEmpty(aCase, ns) { enabled["enabled"] = false phase[aCase] = []interface{}{enabled} } } if p.MinAge != "" { phase["min_age"] = p.MinAge } for actionName, action := range p.Actions { switch actionName { case "readonly", "freeze", "unfollow": enabled["enabled"] = true phase[actionName] = []interface{}{enabled} case "allocate": allocateAction := make(map[string]interface{}) if v, ok := action["number_of_replicas"]; ok { allocateAction["number_of_replicas"] = v } if v, ok := action["total_shards_per_node"]; ok { allocateAction["total_shards_per_node"] = v } else { // Specify the default for total_shards_per_node. This avoids an endless diff loop for ES 7.15 or lower which don't support this setting allocateAction["total_shards_per_node"] = -1 } for _, f := range []string{"include", "require", "exclude"} { if v, ok := action[f]; ok { res, err := json.Marshal(v) if err != nil { return nil, diag.FromErr(err) } allocateAction[f] = string(res) } } phase[actionName] = []interface{}{allocateAction} default: phase[actionName] = []interface{}{action} } } out[0] = phase return out, diags } func resourceIlmDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { client, diags := clients.NewApiClientFromSDKResource(d, meta) if diags.HasError() { return diags } id := d.Id() compId, diags := clients.CompositeIdFromStr(id) if diags.HasError() { return diags } if diags := elasticsearch.DeleteIlm(ctx, client, compId.ResourceId); diags.HasError() { return diags } return diags }