pkg/controller/elasticsearch/client/client.go (102 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package client import ( "context" "crypto/x509" "fmt" "math" "net/http" "time" "go.elastic.co/apm/module/apmelasticsearch/v2" "k8s.io/apimachinery/pkg/types" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/annotation" commonhttp "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/http" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/net" ) const ( // ESClientTimeoutAnnotation is the name of the annotation used to set the Elasticsearch client timeout. ESClientTimeoutAnnotation = "eck.k8s.elastic.co/es-client-timeout" ) // DefaultESClientTimeout is the default timeout value for Elasticsearch requests. var DefaultESClientTimeout = 3 * time.Minute // BasicAuth contains credentials for an Elasticsearch user. type BasicAuth struct { Name string Password string } type IndexRole struct { Names []string `json:"names,omitempty"` Privileges []string `json:",omitempty"` AllowRestrictedIndices *bool `json:"allow_restricted_indices,omitempty" yaml:"allow_restricted_indices,omitempty"` } type ApplicationRole struct { Application string `json:"application,omitempty"` Privileges []string `json:"privileges,omitempty"` Resources []string `json:"resources,omitempty"` } // Role represents an Elasticsearch role. type Role struct { Cluster []string `json:"cluster,omitempty"` Indices []IndexRole `json:"indices,omitempty"` Applications []ApplicationRole `json:"applications,omitempty"` Metadata map[string]any `json:"metadata,omitempty"` } // Client captures the information needed to interact with an Elasticsearch cluster via HTTP type Client interface { AllocationSetter AutoscalingClient DesiredNodesClient ShardLister LicenseClient RemoteClusterClient SecurityClient // Close idle connections in the underlying http client. Close() // Equal returns true if other can be considered as the same client. Equal(other Client) bool // GetClusterInfo get the cluster information at / GetClusterInfo(ctx context.Context) (Info, error) // GetClusterRoutingAllocation retrieves the cluster routing allocation settings. GetClusterRoutingAllocation(ctx context.Context) (ClusterRoutingAllocation, error) // DisableReplicaShardsAllocation disables shards allocation on the cluster (only primaries are allocated). DisableReplicaShardsAllocation(ctx context.Context) error // EnableShardAllocation enables shards allocation on the cluster. EnableShardAllocation(ctx context.Context) error // RemoveTransientAllocationSettings removes allocation filters and enablement settings. RemoveTransientAllocationSettings(ctx context.Context) error // SyncedFlush requests a synced flush on the cluster. Deprecated in 7.6, removed in 8.0. // This is "best-effort", see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-synced-flush.html. SyncedFlush(ctx context.Context) error // Flush requests a flush on the cluster. Flush(ctx context.Context) error // GetClusterHealth calls the _cluster/health api. GetClusterHealth(ctx context.Context) (Health, error) // GetClusterHealthWaitForAllEvents calls _cluster/health?wait_for_events=languid&timeout=0s GetClusterHealthWaitForAllEvents(ctx context.Context) (Health, error) // GetClusterState calls the _cluster/state api. GetClusterState(ctx context.Context) (ClusterState, error) // SetMinimumMasterNodes sets the transient and persistent setting of the same name in cluster settings. SetMinimumMasterNodes(ctx context.Context, n int) error // ReloadSecureSettings will decrypt and re-read the entire keystore, on every cluster node, // but only the reloadable secure settings will be applied ReloadSecureSettings(ctx context.Context) error // GetNodes calls the _nodes api to return a map(nodeName -> Node) GetNodes(ctx context.Context) (Nodes, error) // GetNodesStats calls the _nodes/stats api to return a map(nodeName -> NodeStats) GetNodesStats(ctx context.Context) (NodesStats, error) // ClusterBootstrappedForZen2 returns true if the cluster is relying on zen2 orchestration. ClusterBootstrappedForZen2(ctx context.Context) (bool, error) // AddVotingConfigExclusions sets the transient and persistent setting of the same name in cluster settings. // Introduced in: Elasticsearch 7.0.0 AddVotingConfigExclusions(ctx context.Context, nodeNames []string) error // DeleteVotingConfigExclusions sets the transient and persistent setting of the same name in cluster settings. // // Introduced in: Elasticsearch 7.0.0 DeleteVotingConfigExclusions(ctx context.Context, waitForRemoval bool) error // GetShutdown returns information about ongoing node shutdowns. // Introduced in: Elasticsearch 7.14.0 GetShutdown(ctx context.Context, nodeID *string) (ShutdownResponse, error) // PutShutdown initiates a node shutdown procedure for the given node. // Introduced in: Elasticsearch 7.14.0 PutShutdown(ctx context.Context, nodeID string, shutdownType ShutdownType, reason string) error // DeleteShutdown attempts to cancel an ongoing node shutdown. // Introduced in: Elasticsearch 7.14.0 DeleteShutdown(ctx context.Context, nodeID string) error // Request exposes a low level interface to the underlying HTTP client e.g. for testing purposes. // The Elasticsearch endpoint will be added automatically to the request URL which should therefore just be the path // with a leading / Request(ctx context.Context, r *http.Request) (*http.Response, error) // Version returns the Elasticsearch version this client is constructed for which should equal the minimal version // in the cluster. Version() version.Version // HasProperties checks whether this client has the indicated properties. HasProperties(version version.Version, user BasicAuth, url URLProvider, caCerts []*x509.Certificate) bool } // Timeout returns the Elasticsearch client timeout value for the given Elasticsearch resource. func Timeout(ctx context.Context, es esv1.Elasticsearch) time.Duration { return annotation.ExtractTimeout(ctx, es.ObjectMeta, ESClientTimeoutAnnotation, DefaultESClientTimeout) } func formatAsSeconds(d time.Duration) string { return fmt.Sprintf("%.0fs", math.Round(d.Seconds())) } // NewElasticsearchClient creates a new client for the target cluster. // // If dialer is not nil, it will be used to create new TCP connections func NewElasticsearchClient( dialer net.Dialer, es types.NamespacedName, esURL URLProvider, esUser BasicAuth, v version.Version, caCerts []*x509.Certificate, timeout time.Duration, debug bool, ) Client { client := commonhttp.Client(dialer, caCerts, timeout) client.Transport = apmelasticsearch.WrapRoundTripper(client.Transport) base := &baseClient{ URLProvider: esURL, User: esUser, caCerts: caCerts, HTTP: client, es: es, debug: debug, } return versioned(base, v) }