internal/clients/elasticsearch/index.go (584 lines of code) (raw):
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/terraform-provider-elasticstack/internal/clients"
"github.com/elastic/terraform-provider-elasticstack/internal/models"
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
fwdiags "github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
)
func PutIlm(ctx context.Context, apiClient *clients.ApiClient, policy *models.Policy) diag.Diagnostics {
var diags diag.Diagnostics
policyBytes, err := json.Marshal(map[string]interface{}{"policy": policy})
if err != nil {
return diag.FromErr(err)
}
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
req := esClient.ILM.PutLifecycle.WithBody(bytes.NewReader(policyBytes))
res, err := esClient.ILM.PutLifecycle(policy.Name, req, esClient.ILM.PutLifecycle.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, "Unable to create or update the ILM policy"); diags.HasError() {
return diags
}
return diags
}
func GetIlm(ctx context.Context, apiClient *clients.ApiClient, policyName string) (*models.PolicyDefinition, diag.Diagnostics) {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return nil, diag.FromErr(err)
}
req := esClient.ILM.GetLifecycle.WithPolicy(policyName)
res, err := esClient.ILM.GetLifecycle(req, esClient.ILM.GetLifecycle.WithContext(ctx))
if err != nil {
return nil, diag.FromErr(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, "Unable to fetch ILM policy from the cluster."); diags.HasError() {
return nil, diags
}
// our API response
ilm := make(map[string]models.PolicyDefinition)
if err := json.NewDecoder(res.Body).Decode(&ilm); err != nil {
return nil, diag.FromErr(err)
}
if ilm, ok := ilm[policyName]; ok {
return &ilm, diags
}
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: "Unable to find a ILM policy in the cluster",
Detail: fmt.Sprintf(`Unable to find "%s" ILM policy in the cluster`, policyName),
})
return nil, diags
}
func DeleteIlm(ctx context.Context, apiClient *clients.ApiClient, policyName string) diag.Diagnostics {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
res, err := esClient.ILM.DeleteLifecycle(policyName, esClient.ILM.DeleteLifecycle.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, "Unable to delete ILM policy."); diags.HasError() {
return diags
}
return diags
}
func PutComponentTemplate(ctx context.Context, apiClient *clients.ApiClient, template *models.ComponentTemplate) diag.Diagnostics {
var diags diag.Diagnostics
templateBytes, err := json.Marshal(template)
if err != nil {
return diag.FromErr(err)
}
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
res, err := esClient.Cluster.PutComponentTemplate(template.Name, bytes.NewReader(templateBytes), esClient.Cluster.PutComponentTemplate.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, "Unable to create component template"); diags.HasError() {
return diags
}
return diags
}
func GetComponentTemplate(ctx context.Context, apiClient *clients.ApiClient, templateName string) (*models.ComponentTemplateResponse, diag.Diagnostics) {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return nil, diag.FromErr(err)
}
req := esClient.Cluster.GetComponentTemplate.WithName(templateName)
res, err := esClient.Cluster.GetComponentTemplate(req, esClient.Cluster.GetComponentTemplate.WithContext(ctx))
if err != nil {
return nil, diag.FromErr(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, "Unable to request index template."); diags.HasError() {
return nil, diags
}
var componentTemplates models.ComponentTemplatesResponse
if err := json.NewDecoder(res.Body).Decode(&componentTemplates); err != nil {
return nil, diag.FromErr(err)
}
// we requested only 1 template
if len(componentTemplates.ComponentTemplates) != 1 {
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: "Wrong number of templates returned",
Detail: fmt.Sprintf("Elasticsearch API returned %d when requested '%s' component template.", len(componentTemplates.ComponentTemplates), templateName),
})
return nil, diags
}
tpl := componentTemplates.ComponentTemplates[0]
return &tpl, diags
}
func DeleteComponentTemplate(ctx context.Context, apiClient *clients.ApiClient, templateName string) diag.Diagnostics {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
res, err := esClient.Cluster.DeleteComponentTemplate(templateName, esClient.Cluster.DeleteComponentTemplate.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, "Unable to delete component template"); diags.HasError() {
return diags
}
return diags
}
func PutIndexTemplate(ctx context.Context, apiClient *clients.ApiClient, template *models.IndexTemplate) diag.Diagnostics {
var diags diag.Diagnostics
templateBytes, err := json.Marshal(template)
if err != nil {
return diag.FromErr(err)
}
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
res, err := esClient.Indices.PutIndexTemplate(template.Name, bytes.NewReader(templateBytes), esClient.Indices.PutIndexTemplate.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, "Unable to create index template"); diags.HasError() {
return diags
}
return diags
}
func GetIndexTemplate(ctx context.Context, apiClient *clients.ApiClient, templateName string) (*models.IndexTemplateResponse, diag.Diagnostics) {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return nil, diag.FromErr(err)
}
req := esClient.Indices.GetIndexTemplate.WithName(templateName)
res, err := esClient.Indices.GetIndexTemplate(req, esClient.Indices.GetIndexTemplate.WithContext(ctx))
if err != nil {
return nil, diag.FromErr(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, "Unable to request index template."); diags.HasError() {
return nil, diags
}
var indexTemplates models.IndexTemplatesResponse
if err := json.NewDecoder(res.Body).Decode(&indexTemplates); err != nil {
return nil, diag.FromErr(err)
}
// we requested only 1 template
if len(indexTemplates.IndexTemplates) != 1 {
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: "Wrong number of templates returned",
Detail: fmt.Sprintf("Elasticsearch API returned %d when requested '%s' template.", len(indexTemplates.IndexTemplates), templateName),
})
return nil, diags
}
tpl := indexTemplates.IndexTemplates[0]
return &tpl, diags
}
func DeleteIndexTemplate(ctx context.Context, apiClient *clients.ApiClient, templateName string) diag.Diagnostics {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
res, err := esClient.Indices.DeleteIndexTemplate(templateName, esClient.Indices.DeleteIndexTemplate.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, "Unable to delete index template"); diags.HasError() {
return diags
}
return diags
}
func PutIndex(ctx context.Context, apiClient *clients.ApiClient, index *models.Index, params *models.PutIndexParams) fwdiags.Diagnostics {
indexBytes, err := json.Marshal(index)
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
esClient, err := apiClient.GetESClient()
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
opts := []func(*esapi.IndicesCreateRequest){
esClient.Indices.Create.WithBody(bytes.NewReader(indexBytes)),
esClient.Indices.Create.WithContext(ctx),
esClient.Indices.Create.WithWaitForActiveShards(params.WaitForActiveShards),
esClient.Indices.Create.WithMasterTimeout(params.MasterTimeout),
esClient.Indices.Create.WithTimeout(params.Timeout),
}
res, err := esClient.Indices.Create(
index.Name,
opts...,
)
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
defer res.Body.Close()
diags := utils.CheckError(res, fmt.Sprintf("Unable to create index: %s", index.Name))
return utils.FrameworkDiagsFromSDK(diags)
}
func DeleteIndex(ctx context.Context, apiClient *clients.ApiClient, name string) fwdiags.Diagnostics {
esClient, err := apiClient.GetESClient()
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
res, err := esClient.Indices.Delete([]string{name}, esClient.Indices.Delete.WithContext(ctx))
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
defer res.Body.Close()
diags := utils.CheckError(res, fmt.Sprintf("Unable to delete the index: %s", name))
return utils.FrameworkDiagsFromSDK(diags)
}
func GetIndex(ctx context.Context, apiClient *clients.ApiClient, name string) (*models.Index, fwdiags.Diagnostics) {
indices, diags := GetIndices(ctx, apiClient, name)
if diags.HasError() {
return nil, diags
}
if index, ok := indices[name]; ok {
return &index, nil
}
return nil, nil
}
func GetIndices(ctx context.Context, apiClient *clients.ApiClient, name string) (map[string]models.Index, fwdiags.Diagnostics) {
esClient, err := apiClient.GetESClient()
if err != nil {
return nil, fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
req := esClient.Indices.Get.WithFlatSettings(true)
res, err := esClient.Indices.Get([]string{name}, req, esClient.Indices.Get.WithContext(ctx))
if err != nil {
return nil, fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
defer res.Body.Close()
// if there is no index found, return the empty struct, which should force the creation of the index
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested index: %s", name)); diags.HasError() {
return nil, utils.FrameworkDiagsFromSDK(diags)
}
indices := make(map[string]models.Index)
if err := json.NewDecoder(res.Body).Decode(&indices); err != nil {
return nil, fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
return indices, nil
}
func DeleteIndexAlias(ctx context.Context, apiClient *clients.ApiClient, index string, aliases []string) fwdiags.Diagnostics {
esClient, err := apiClient.GetESClient()
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
res, err := esClient.Indices.DeleteAlias([]string{index}, aliases, esClient.Indices.DeleteAlias.WithContext(ctx))
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
defer res.Body.Close()
diags := utils.CheckError(res, fmt.Sprintf("Unable to delete aliases '%v' for index '%s'", index, aliases))
return utils.FrameworkDiagsFromSDK(diags)
}
func UpdateIndexAlias(ctx context.Context, apiClient *clients.ApiClient, index string, alias *models.IndexAlias) fwdiags.Diagnostics {
aliasBytes, err := json.Marshal(alias)
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
esClient, err := apiClient.GetESClient()
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
req := esClient.Indices.PutAlias.WithBody(bytes.NewReader(aliasBytes))
res, err := esClient.Indices.PutAlias([]string{index}, alias.Name, req, esClient.Indices.PutAlias.WithContext(ctx))
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
defer res.Body.Close()
diags := utils.CheckError(res, fmt.Sprintf("Unable to update alias '%v' for index '%s'", index, alias.Name))
return utils.FrameworkDiagsFromSDK(diags)
}
func UpdateIndexSettings(ctx context.Context, apiClient *clients.ApiClient, index string, settings map[string]interface{}) fwdiags.Diagnostics {
settingsBytes, err := json.Marshal(settings)
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
esClient, err := apiClient.GetESClient()
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
req := esClient.Indices.PutSettings.WithIndex(index)
res, err := esClient.Indices.PutSettings(bytes.NewReader(settingsBytes), req, esClient.Indices.PutSettings.WithContext(ctx))
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
defer res.Body.Close()
diags := utils.CheckError(res, "Unable to update index settings")
return utils.FrameworkDiagsFromSDK(diags)
}
func UpdateIndexMappings(ctx context.Context, apiClient *clients.ApiClient, index, mappings string) fwdiags.Diagnostics {
esClient, err := apiClient.GetESClient()
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
res, err := esClient.Indices.PutMapping([]string{index}, strings.NewReader(mappings), esClient.Indices.PutMapping.WithContext(ctx))
if err != nil {
return fwdiags.Diagnostics{
fwdiags.NewErrorDiagnostic(err.Error(), err.Error()),
}
}
defer res.Body.Close()
diags := utils.CheckError(res, "Unable to update index mappings")
return utils.FrameworkDiagsFromSDK(diags)
}
func PutDataStream(ctx context.Context, apiClient *clients.ApiClient, dataStreamName string) diag.Diagnostics {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
res, err := esClient.Indices.CreateDataStream(dataStreamName, esClient.Indices.CreateDataStream.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to create DataStream: %s", dataStreamName)); diags.HasError() {
return diags
}
return diags
}
func GetDataStream(ctx context.Context, apiClient *clients.ApiClient, dataStreamName string) (*models.DataStream, diag.Diagnostics) {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return nil, diag.FromErr(err)
}
req := esClient.Indices.GetDataStream.WithName(dataStreamName)
res, err := esClient.Indices.GetDataStream(req, esClient.Indices.GetDataStream.WithContext(ctx))
if err != nil {
return nil, diag.FromErr(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested DataStream: %s", dataStreamName)); diags.HasError() {
return nil, diags
}
dStreams := make(map[string][]models.DataStream)
if err := json.NewDecoder(res.Body).Decode(&dStreams); err != nil {
return nil, diag.FromErr(err)
}
// if the DataStream found in must be the first index in the data_stream object
ds := dStreams["data_streams"][0]
return &ds, diags
}
func DeleteDataStream(ctx context.Context, apiClient *clients.ApiClient, dataStreamName string) diag.Diagnostics {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
res, err := esClient.Indices.DeleteDataStream([]string{dataStreamName}, esClient.Indices.DeleteDataStream.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete DataStream: %s", dataStreamName)); diags.HasError() {
return diags
}
return diags
}
func PutDataStreamLifecycle(ctx context.Context, apiClient *clients.ApiClient, dataStreamName string, expand_wildcards string, lifecycle models.LifecycleSettings) fwdiags.Diagnostics {
esClient, err := apiClient.GetESClient()
if err != nil {
return utils.FrameworkDiagFromError(err)
}
lifecycleBytes, err := json.Marshal(lifecycle)
if err != nil {
return utils.FrameworkDiagFromError(err)
}
opts := []func(*esapi.IndicesPutDataLifecycleRequest){
esClient.Indices.PutDataLifecycle.WithBody(bytes.NewReader(lifecycleBytes)),
esClient.Indices.PutDataLifecycle.WithContext(ctx),
esClient.Indices.PutDataLifecycle.WithExpandWildcards(expand_wildcards),
}
res, err := esClient.Indices.PutDataLifecycle([]string{dataStreamName}, opts...)
if err != nil {
return utils.FrameworkDiagFromError(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to create DataStreamLifecycle: %s", dataStreamName)); diags.HasError() {
return utils.FrameworkDiagsFromSDK(diags)
}
return nil
}
func GetDataStreamLifecycle(ctx context.Context, apiClient *clients.ApiClient, dataStreamName string, expand_wildcards string) (*[]models.DataStreamLifecycle, fwdiags.Diagnostics) {
esClient, err := apiClient.GetESClient()
if err != nil {
return nil, utils.FrameworkDiagFromError(err)
}
opts := []func(*esapi.IndicesGetDataLifecycleRequest){
esClient.Indices.GetDataLifecycle.WithContext(ctx),
esClient.Indices.GetDataLifecycle.WithExpandWildcards(expand_wildcards),
}
res, err := esClient.Indices.GetDataLifecycle([]string{dataStreamName}, opts...)
if err != nil {
return nil, utils.FrameworkDiagFromError(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested DataStreamLifecycle: %s", dataStreamName)); diags.HasError() {
return nil, utils.FrameworkDiagsFromSDK(diags)
}
dStreams := struct {
DataStreams []models.DataStreamLifecycle `json:"data_streams,omitempty"`
}{}
if err := json.NewDecoder(res.Body).Decode(&dStreams); err != nil {
return nil, utils.FrameworkDiagFromError(err)
}
ds := dStreams.DataStreams
return &ds, nil
}
func DeleteDataStreamLifecycle(ctx context.Context, apiClient *clients.ApiClient, dataStreamName string, expand_wildcards string) fwdiags.Diagnostics {
esClient, err := apiClient.GetESClient()
if err != nil {
return utils.FrameworkDiagFromError(err)
}
opts := []func(*esapi.IndicesDeleteDataLifecycleRequest){
esClient.Indices.DeleteDataLifecycle.WithContext(ctx),
esClient.Indices.DeleteDataLifecycle.WithExpandWildcards(expand_wildcards),
}
res, err := esClient.Indices.DeleteDataLifecycle([]string{dataStreamName}, opts...)
if err != nil {
return utils.FrameworkDiagFromError(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete DataStreamLifecycle: %s", dataStreamName)); diags.HasError() {
return utils.FrameworkDiagsFromSDK(diags)
}
return nil
}
func PutIngestPipeline(ctx context.Context, apiClient *clients.ApiClient, pipeline *models.IngestPipeline) diag.Diagnostics {
var diags diag.Diagnostics
pipelineBytes, err := json.Marshal(pipeline)
if err != nil {
return diag.FromErr(err)
}
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
res, err := esClient.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(pipelineBytes), esClient.Ingest.PutPipeline.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to create or update ingest pipeline: %s", pipeline.Name)); diags.HasError() {
return diags
}
return diags
}
func GetIngestPipeline(ctx context.Context, apiClient *clients.ApiClient, name *string) (*models.IngestPipeline, diag.Diagnostics) {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return nil, diag.FromErr(err)
}
req := esClient.Ingest.GetPipeline.WithPipelineID(*name)
res, err := esClient.Ingest.GetPipeline(req, esClient.Ingest.GetPipeline.WithContext(ctx))
if err != nil {
return nil, diag.FromErr(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested ingest pipeline: %s", *name)); diags.HasError() {
return nil, diags
}
pipelines := make(map[string]models.IngestPipeline)
if err := json.NewDecoder(res.Body).Decode(&pipelines); err != nil {
return nil, diag.FromErr(err)
}
pipeline := pipelines[*name]
pipeline.Name = *name
return &pipeline, diags
}
func DeleteIngestPipeline(ctx context.Context, apiClient *clients.ApiClient, name *string) diag.Diagnostics {
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}
res, err := esClient.Ingest.DeletePipeline(*name, esClient.Ingest.DeletePipeline.WithContext(ctx))
if err != nil {
return diags
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete ingest pipeline: %s", *name)); diags.HasError() {
return diags
}
return diags
}