pkg/controller/elasticsearch/reconcile/state.go (152 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package reconcile import ( "context" "fmt" "reflect" corev1 "k8s.io/api/core/v1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/hints" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" ) // State holds the accumulated state during the reconcile loop including the response and a copy of the // Elasticsearch resource from the start of reconciliation, for status updates. type State struct { *events.Recorder *StatusReporter cluster esv1.Elasticsearch status esv1.ElasticsearchStatus hints hints.OrchestrationsHints } // NewState creates a new reconcile state based on the given cluster func NewState(c esv1.Elasticsearch) (*State, error) { hints, err := hints.NewFromAnnotations(c.Annotations) if err != nil { return nil, err } status := *c.Status.DeepCopy() status.ObservedGeneration = c.Generation // reset the health to 'unknown' so that if reconciliation fails before the observer has had a chance to get it, // we stop reporting a health that may be out of date status.Health = esv1.ElasticsearchUnknownHealth // reset the phase to an empty string so that we do not report an outdated phase given that certain phases are // stickier than others (eg. invalid) status.Phase = "" return &State{ Recorder: events.NewRecorder(), StatusReporter: &StatusReporter{ DownscaleReporter: &DownscaleReporter{}, UpscaleReporter: &UpscaleReporter{}, UpgradeReporter: &UpgradeReporter{}, }, cluster: c, status: status, hints: hints, }, nil } // MustNewState like NewState but panics on error. Use recommended only in test code. func MustNewState(c esv1.Elasticsearch) *State { state, err := NewState(c) if err != nil { panic(err) } return state } func (s *State) fetchMinRunningVersion(ctx context.Context, resourcesState ResourcesState) (*version.Version, error) { log := ulog.FromContext(ctx) minPodVersion, err := version.MinInPods(resourcesState.AllPods, label.VersionLabelName) if err != nil { log.Error(err, "failed to parse running Pods version", "namespace", s.cluster.Namespace, "es_name", s.cluster.Name) return nil, err } minSsetVersion, err := version.MinInStatefulSets(resourcesState.StatefulSets, label.VersionLabelName) if err != nil { log.Error(err, "failed to parse running Pods version", "namespace", s.cluster.Namespace, "es_name", s.cluster.Name) return nil, err } if minPodVersion == nil { return minSsetVersion, nil } if minSsetVersion == nil { return minPodVersion, nil } if minPodVersion.GT(*minSsetVersion) { return minSsetVersion, nil } return minPodVersion, nil } func (s *State) UpdateClusterHealth(clusterHealth esv1.ElasticsearchHealth) *State { if clusterHealth == "" { s.status.Health = esv1.ElasticsearchUnknownHealth return s } s.status.Health = clusterHealth return s } func (s *State) UpdateWithPhase( phase esv1.ElasticsearchOrchestrationPhase, ) *State { switch { // do not overwrite the Invalid marker case s.status.Phase == esv1.ElasticsearchResourceInvalid: return s // do not overwrite non-ready phases like MigratingData case s.status.Phase != "" && phase == esv1.ElasticsearchApplyingChangesPhase: return s } s.status.Phase = phase return s } func (s *State) UpdateAvailableNodes( resourcesState ResourcesState, ) *State { s.status.AvailableNodes = int32(len(AvailableElasticsearchNodes(resourcesState.CurrentPods))) return s } func (s *State) UpdateMinRunningVersion( ctx context.Context, resourcesState ResourcesState, ) *State { lowestVersion, err := s.fetchMinRunningVersion(ctx, resourcesState) // error already handled in fetchMinRunningVersion, move on with the status update if err == nil && lowestVersion != nil { s.status.Version = lowestVersion.String() } // Update the related condition. if s.status.Version == "" { s.ReportCondition(esv1.RunningDesiredVersion, corev1.ConditionUnknown, "No running version reported") return s } desiredVersion, err := version.Parse(s.cluster.Spec.Version) if err != nil { s.ReportCondition(esv1.RunningDesiredVersion, corev1.ConditionUnknown, fmt.Sprintf("Error while parsing desired version: %s", err.Error())) return s } runningVersion, err := version.Parse(s.status.Version) if err != nil { s.ReportCondition(esv1.RunningDesiredVersion, corev1.ConditionUnknown, fmt.Sprintf("Error while parsing running version: %s", err.Error())) return s } if desiredVersion.GT(runningVersion) { s.ReportCondition( esv1.RunningDesiredVersion, corev1.ConditionFalse, fmt.Sprintf("Upgrading from %s to %s", runningVersion.String(), desiredVersion.String()), ) return s } s.ReportCondition(esv1.RunningDesiredVersion, corev1.ConditionTrue, fmt.Sprintf("All nodes are running version %s", runningVersion)) return s } // UpdateElasticsearchInvalidWithEvent is a convenient method to set the phase to esv1.ElasticsearchResourceInvalid // and generate an event at the same time. func (s *State) UpdateElasticsearchInvalidWithEvent(msg string) { s.status.Phase = esv1.ElasticsearchResourceInvalid s.AddEvent(corev1.EventTypeWarning, events.EventReasonValidation, msg) } // Apply takes the current Elasticsearch status, compares it to the previous status, and updates the status accordingly. // It returns the events to emit and an updated version of the Elasticsearch cluster resource with // the current status applied to its status sub-resource. func (s *State) Apply() ([]events.Event, *esv1.Elasticsearch) { previous := s.cluster.Status current := s.MergeStatusReportingWith(s.status) if reflect.DeepEqual(previous, current) { return s.Events(), nil } if current.IsDegraded(previous) { s.AddEvent(corev1.EventTypeWarning, events.EventReasonUnhealthy, "Elasticsearch cluster health degraded") } s.cluster.Status = current return s.Events(), &s.cluster } // UpdateOrchestrationHints updates the orchestration hints collected so far with the hints in hint. func (s *State) UpdateOrchestrationHints(hint hints.OrchestrationsHints) { s.hints = s.hints.Merge(hint) } // OrchestrationHints returns the current annotation hints as maintained in reconciliation state. Initially these will be // populated from the Elasticsearch resource. But after calls to UpdateOrchestrationHints they can deviate from the state // stored in the API server. func (s *State) OrchestrationHints() hints.OrchestrationsHints { return s.hints }