pkg/controller/elasticsearch/client/model.go (384 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package client import ( "encoding/json" "fmt" "strings" "time" "github.com/pkg/errors" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/stringsutil" ) // Info represents the response from / type Info struct { ClusterName string `json:"cluster_name"` ClusterUUID string `json:"cluster_uuid"` Version struct { Number string `json:"number"` } `json:"version"` } // Health represents the response from _cluster/health type Health struct { ClusterName string `json:"cluster_name"` Status esv1.ElasticsearchHealth `json:"status"` TimedOut bool `json:"timed_out"` NumberOfNodes int `json:"number_of_nodes"` NumberOfDataNodes int `json:"number_of_data_nodes"` ActivePrimaryShards int `json:"active_primary_shards"` ActiveShards int `json:"active_shards"` RelocatingShards int `json:"relocating_shards"` InitializingShards int `json:"initializing_shards"` UnassignedShards int `json:"unassigned_shards"` DelayedUnassignedShards int `json:"delayed_unassigned_shards"` NumberOfPendingTasks int `json:"number_of_pending_tasks"` NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"` TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"` ActiveShardsPercentAsNumber float32 `json:"active_shards_percent_as_number"` } // HasShardActivity indicates that there is some shard activity in the cluster. // It can be the case if some shards are being fetched, relocated or initialized. // It's only reliable if Health result was created with wait_for_events=languid // so that there are no pending initialisations in the task queue. // It returns true if the status request has timed out. func (h Health) HasShardActivity() bool { return h.TimedOut || // make sure request did not time out (i.e. no pending events) h.NumberOfInFlightFetch > 0 || // no shards being fetched h.InitializingShards > 0 || // no shards initializing h.RelocatingShards > 0 // no shards relocating } type ShardState string // These are possible shard states const ( STARTED ShardState = "STARTED" INITIALIZING ShardState = "INITIALIZING" RELOCATING ShardState = "RELOCATING" UNASSIGNED ShardState = "UNASSIGNED" ) type ShardType string const ( Primary ShardType = "p" Replica ShardType = "r" ) // Nodes partially models the response from a request to /_nodes type Nodes struct { Nodes map[string]Node `json:"nodes"` } func (n Nodes) Names() []string { names := make([]string, 0, len(n.Nodes)) for _, node := range n.Nodes { names = append(names, node.Name) } return names } // Node partially models an Elasticsearch node retrieved from /_nodes type Node struct { Name string `json:"name"` Version string `json:"version"` Roles []string `json:"roles"` } func (n Node) isV7OrAbove() (bool, error) { v, err := version.Parse(n.Version) if err != nil { return false, errors.Wrap(err, fmt.Sprintf("unable to parse node version %s", n.Version)) } return v.Major >= 7, nil } // NodesStats partially models the response from a request to /_nodes/stats type NodesStats struct { Nodes map[string]NodeStats `json:"nodes"` } // NodeStats partially models an Elasticsearch node retrieved from /_nodes/stats type NodeStats struct { Name string `json:"name"` OS struct { CGroup *CGroup `json:"cgroup"` } `json:"os"` } type CGroup struct { Memory struct { LimitInBytes string `json:"limit_in_bytes"` } `json:"memory"` CPU struct { CFSPeriodMicros int `json:"cfs_period_micros"` CFSQuotaMicros int `json:"cfs_quota_micros"` } `json:"cpu"` } // ClusterStateNode represents an element in the `node` structure in // Elasticsearch cluster state. type ClusterStateNode struct { Name string `json:"name"` EphemeralID string `json:"ephemeral_id"` TransportAddress string `json:"transport_address"` Attributes struct { MlMachineMemory string `json:"ml.machine_memory"` MlMaxOpenJobs string `json:"ml.max_open_jobs"` XpackInstalled string `json:"xpack.installed"` MlEnabled string `json:"ml.enabled"` } `json:"attributes"` } // Shards contains the shards in the Elasticsearch cluster type Shards []Shard // Shard partially models Elasticsearch cluster shard. type Shard struct { Index string `json:"index"` Shard string `json:"shard"` State ShardState `json:"state"` NodeName string `json:"node"` Type ShardType `json:"prirep"` } type RoutingTable struct { Indices map[string]Shards `json:"indices"` } // GetShardsByNode returns shards by node. // The result is a map with the name of the nodes as keys and the list of shards on the nodes as values. func (s Shards) GetShardsByNode() map[string]Shards { result := make(map[string]Shards) for _, shard := range s { // Unassigned shards are ignored if len(shard.NodeName) > 0 { result[shard.NodeName] = append(result[shard.NodeName], shard) } } return result } // Strip extra information from the nodeName field // eg. "cluster-node-2 -> 10.56.2.33 8DqGuLtrSNyMfE2EfKNDgg" becomes "cluster-node-2" // see https://github.com/elastic/cloud-on-k8s/issues/1796 func (s *Shards) UnmarshalJSON(data []byte) error { type Alias Shards aux := (*Alias)(s) if err := json.Unmarshal(data, &aux); err != nil { return err } for i, shard := range *aux { if idx := strings.IndexByte(shard.NodeName, ' '); idx >= 0 { (*s)[i].NodeName = (*s)[i].NodeName[:idx] } } return nil } // IsRelocating is true if the shard is relocating to another node. func (s Shard) IsRelocating() bool { return s.State == RELOCATING } // IsStarted is true if the shard is started on its current node. func (s Shard) IsStarted() bool { return s.State == STARTED } // IsInitializing is true if the shard is currently initializing on the node. func (s Shard) IsInitializing() bool { return s.State == INITIALIZING } // IsReplica is true if the shard is a replica. func (s Shard) IsReplica() bool { return s.Type == Replica } // IsPrimary is true if the shard is a primary shard. func (s Shard) IsPrimary() bool { return s.Type == Primary } // Key is a composite key of index name and shard number that identifies all // copies of a shard across nodes. func (s Shard) Key() string { return stringsutil.Concat(s.Index, "/", s.Shard) } // AllocationSettings model a subset of the supported attributes for dynamic Elasticsearch cluster settings. type AllocationSettings struct { Cluster ClusterRoutingSettings `json:"cluster,omitempty"` } // TODO awareness settings type ClusterRoutingSettings struct { Routing RoutingSettings `json:"routing,omitempty"` } type RoutingSettings struct { Allocation RoutingAllocationSettings `json:"allocation,omitempty"` } type RoutingAllocationSettings struct { Exclude AllocationExclude `json:"exclude,omitempty"` Enable string `json:"enable,omitempty"` } type AllocationExclude struct { Name string `json:"_name,omitempty"` } func (s AllocationSettings) IsShardsAllocationEnabled() bool { enable := s.Cluster.Routing.Allocation.Enable return enable == "" || enable == "all" } // ClusterRoutingAllocation models a subset of transient allocation settings for an Elasticsearch cluster. type ClusterRoutingAllocation struct { Transient AllocationSettings `json:"transient,omitempty"` } // DiscoveryZen set minimum number of master eligible nodes that must be visible to form a cluster. type DiscoveryZen struct { MinimumMasterNodes int `json:"discovery.zen.minimum_master_nodes"` } // DiscoveryZenSettings are cluster settings related to the zen discovery mechanism. type DiscoveryZenSettings struct { Transient DiscoveryZen `json:"transient"` Persistent DiscoveryZen `json:"persistent"` } // ErrorResponse is an Elasticsearch error response. type ErrorResponse struct { Status int `json:"status"` Error struct { CausedBy struct { Reason string `json:"reason"` Type string `json:"type"` } `json:"caused_by"` Reason string `json:"reason"` Type string `json:"type"` StackTrace string `json:"stack_trace,omitempty"` RootCause []struct { Reason string `json:"reason"` Type string `json:"type"` } `json:"root_cause"` } `json:"error"` } // ElasticsearchLicenseType the type of a license. type ElasticsearchLicenseType string // Supported ElasticsearchLicenseTypes. const ( ElasticsearchLicenseTypeBasic ElasticsearchLicenseType = "basic" ElasticsearchLicenseTypeTrial ElasticsearchLicenseType = "trial" ElasticsearchLicenseTypeGold ElasticsearchLicenseType = "gold" ElasticsearchLicenseTypePlatinum ElasticsearchLicenseType = "platinum" ElasticsearchLicenseTypeEnterprise ElasticsearchLicenseType = "enterprise" ) // ElasticsearchLicenseTypeOrder license types mapped to ints in increasing order of feature sets for sorting purposes. var ElasticsearchLicenseTypeOrder = map[ElasticsearchLicenseType]int{ ElasticsearchLicenseTypeBasic: 1, ElasticsearchLicenseTypeTrial: 2, ElasticsearchLicenseTypeGold: 3, ElasticsearchLicenseTypePlatinum: 4, ElasticsearchLicenseTypeEnterprise: 5, } // License models the Elasticsearch license applied to a cluster. Signature will be empty on reads. IssueDate, ExpiryTime and Status can be empty on writes. type License struct { Status string `json:"status,omitempty"` UID string `json:"uid"` Type string `json:"type"` IssueDate *time.Time `json:"issue_date,omitempty"` IssueDateInMillis int64 `json:"issue_date_in_millis"` ExpiryDate *time.Time `json:"expiry_date,omitempty"` ExpiryDateInMillis int64 `json:"expiry_date_in_millis"` MaxNodes int `json:"max_nodes,omitempty"` MaxResourceUnits int `json:"max_resource_units,omitempty"` IssuedTo string `json:"issued_to"` Issuer string `json:"issuer"` StartDateInMillis int64 `json:"start_date_in_millis"` Signature string `json:"signature,omitempty"` } // StartTime is the date as of which this license is valid. func (l License) StartTime() time.Time { return time.Unix(0, l.StartDateInMillis*int64(time.Millisecond)) } // ExpiryTime is the date as of which the license is no longer valid. func (l License) ExpiryTime() time.Time { return time.Unix(0, l.ExpiryDateInMillis*int64(time.Millisecond)) } // IsValid returns true if the license is still valid at the given point in time. func (l License) IsValid(instant time.Time) bool { return (l.StartTime().Equal(instant) || l.StartTime().Before(instant)) && l.ExpiryTime().After(instant) } // IsSupported returns true if the current license type is supported by the given version of Elasticsearch. func (l License) IsSupported(v *version.Version) bool { if l.Type == string(ElasticsearchLicenseTypeEnterprise) && !v.GTE(version.MustParse("7.8.1")) { return false } return true } // LicenseUpdateRequest is the request to apply a license to a cluster. Licenses must contain signature. type LicenseUpdateRequest struct { Licenses []License `json:"licenses"` } // LicenseUpdateResponse is the response to a license update request. type LicenseUpdateResponse struct { Acknowledged bool `json:"acknowledged"` // LicenseStatus can be one of 'valid', 'invalid', 'expired' LicenseStatus string `json:"license_status"` } func (lr LicenseUpdateResponse) IsSuccess() bool { return lr.Acknowledged && lr.LicenseStatus == "valid" } // StartTrialResponse is the response to the start trial API call. type StartTrialResponse struct { Acknowledged bool `json:"acknowledged"` TrialWasStarted bool `json:"trial_was_started"` ErrorMessage string `json:"error_message"` } func (sr StartTrialResponse) IsSuccess() bool { return sr.Acknowledged && sr.TrialWasStarted } // LicenseResponse is the response to GET _xpack/license. Licenses won't contain signature. type LicenseResponse struct { License License `json:"license"` } // StartBasicResponse is the response to the start trial API call. type StartBasicResponse struct { Acknowledged bool `json:"acknowledged"` BasicWasStarted bool `json:"basic_was_started"` ErrorMessage string `json:"error_message"` } // Hit represents a single search hit. type Hit struct { Index string `json:"_index"` Type string `json:"_type"` ID string `json:"_id"` Score float64 `json:"_score"` Source map[string]interface{} `json:"_source"` } // Hits are the collections of search hits. type Hits struct { Total json.RawMessage // model when needed Hits []Hit `json:"hits"` } // SearchResults are the results returned from a _search. type SearchResults struct { Took int Hits Hits `json:"hits"` Cluster *Cluster `json:"_clusters,omitempty"` Shards json.RawMessage // model when needed Aggs map[string]json.RawMessage // model when needed } // Cluster models the Elasticsearch response for searches that involve remote clusters. // It can be used to provide more details about a failure. type Cluster struct { Details map[string]ClusterDetail `json:"details"` } func (c *Cluster) Failures() string { if c == nil { return "" } failures := make([]string, 0, len(c.Details)) for name, detail := range c.Details { for _, failure := range detail.Failures { failures = append(failures, fmt.Sprintf("%s: %+v", name, failure)) } } if len(failures) == 0 { return "" } return strings.Join(failures, ",") } type ClusterDetail struct { Status string `json:"status"` Failures []struct { Shard int `json:"shard"` Reason struct { Type string `json:"type"` Reason string `json:"reason"` } `json:"reason"` } `json:"failures"` } // ShutdownType is the set of different shutdown operation types supported by Elasticsearch. type ShutdownType string var ( // Restart indicates the intent to restart an Elasticsearch node. Restart ShutdownType = "restart" // Remove indicates the intent to permanently remove a node from the Elasticsearch cluster. Remove ShutdownType = "remove" ) // ShutdownStatus is the set of different status a shutdown requests can have. type ShutdownStatus string // Applies is a predicate that checks this status against a given shutdown struct and returns true if they are the same status. func (status ShutdownStatus) Applies(shutdown NodeShutdown) bool { return shutdown.Status == status } var ( // ShutdownInProgress means a shutdown request has been accepted and is being processed in Elasticsearch. ShutdownInProgress ShutdownStatus = "IN_PROGRESS" // ShutdownComplete means a shutdown request has been processed and the node can be either restarted or taken out // of the cluster by an orchestrator. ShutdownComplete ShutdownStatus = "COMPLETE" // ShutdownStalled means a shutdown request cannot be processed further because of a limiting constraint e.g. // no place for shard data to migrate to. ShutdownStalled ShutdownStatus = "STALLED" // ShutdownNotStarted is an error condition that should never be returned by Elasticsearch and indicates a bug if so. ShutdownNotStarted ShutdownStatus = "NOT_STARTED" ) // ShardMigration is the status of shards that are being migrated away from a node that goes through a shutdown. type ShardMigration struct { Status ShutdownStatus `json:"status"` ShardMigrationsRemaining int `json:"shard_migrations_remaining"` Explanation string `json:"explanation"` } // PersistentTasks expresses the status of preparing ongoing persistent tasks for a node shutdown. type PersistentTasks struct { Status ShutdownStatus `json:"status"` } // Plugins represents the status of Elasticsearch plugins being prepared for a node shutdown. type Plugins struct { Status ShutdownStatus `json:"status"` } // NodeShutdown is the representation of an ongoing shutdown request. type NodeShutdown struct { NodeID string `json:"node_id"` Type string `json:"type"` Reason string `json:"reason"` ShutdownStartedMillis int `json:"shutdown_startedmillis"` // missing _ is a serialization inconsistency in Elasticsearch Status ShutdownStatus `json:"status"` ShardMigration ShardMigration `json:"shard_migration"` PersistentTasks PersistentTasks `json:"persistent_tasks"` Plugins Plugins `json:"plugins"` } // Is tests a NodeShutdown request whether it is of type t. func (ns NodeShutdown) Is(t ShutdownType) bool { // API returns type in capital letters currently return strings.EqualFold(ns.Type, string(t)) } // ShutdownRequest is the body of a node shutdown request. type ShutdownRequest struct { Type ShutdownType `json:"type"` Reason string `json:"reason"` AllocationDelay time.Duration `json:"allocation_delay,omitempty"` } // ShutdownResponse is the response wrapper for retrieving the status of ongoing node shutdowns from Elasticsearch. type ShutdownResponse struct { Nodes []NodeShutdown `json:"nodes"` } // ClusterState models the internal representation of the cluster state type ClusterState struct { Metadata Metadata `json:"metadata"` } type Metadata struct { ReservedState ReservedState `json:"reserved_state"` } type ReservedState struct { FileSettings FileSettings `json:"file_settings"` } type FileSettings struct { Version int64 `json:"version"` Errors *FileSettingsErrors `json:"errors,omitempty"` } type FileSettingsErrors struct { Version int64 `json:"version"` ErrorKind string `json:"error_kind"` Errors []string `json:"errors"` }