pkg/controller/elasticsearch/reconcile/status.go (255 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package reconcile
import (
"reflect"
"sort"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
commonv1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1alpha1"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset"
esclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/shutdown"
)
type StatusReporter struct {
commonv1alpha1.Conditions
*UpscaleReporter
*DownscaleReporter
*UpgradeReporter
}
// MergeStatusReportingWith creates a new ElasticsearchStatus merging the reported status and an existing ElasticsearchStatus.
func (s *StatusReporter) MergeStatusReportingWith(otherStatus esv1.ElasticsearchStatus) esv1.ElasticsearchStatus {
mergedStatus := otherStatus.DeepCopy()
mergedStatus.UpgradeOperation = s.UpgradeReporter.Merge(otherStatus.UpgradeOperation)
mergedStatus.UpscaleOperation = s.UpscaleReporter.Merge(otherStatus.UpscaleOperation)
mergedStatus.DownscaleOperation = s.DownscaleReporter.Merge(otherStatus.DownscaleOperation)
// Merge conditions
for _, condition := range s.Conditions {
mergedStatus.Conditions = mergedStatus.Conditions.MergeWith(condition)
}
return *mergedStatus
}
// ReportCondition records a condition to be reported in the status.
// Any existing condition with the same Type is overridden.
func (s *StatusReporter) ReportCondition(
conditionType commonv1alpha1.ConditionType,
status corev1.ConditionStatus,
message string) {
s.Conditions = s.Conditions.MergeWith(commonv1alpha1.Condition{
Type: conditionType,
Status: status,
LastTransitionTime: metav1.Now(),
Message: message,
})
}
// -- Upscale status
type UpscaleReporter struct {
// Expected nodes to be upscaled
nodes map[string]esv1.NewNode
}
// RecordNewNodes records pending node creations.
func (u *UpscaleReporter) RecordNewNodes(nodes []string) {
if u == nil {
return
}
if u.nodes == nil {
u.nodes = make(map[string]esv1.NewNode, len(nodes))
}
for _, node := range nodes {
newNode := u.nodes[node]
newNode.Name = node
newNode.Status = esv1.NewNodePending
u.nodes[node] = newNode
}
}
// UpdateNodesStatuses updates the status and the message fields for a set of nodes belonging to the same StatefulSet.
func (u *UpscaleReporter) UpdateNodesStatuses(status esv1.NewNodeStatus, statefulSetName, message string, minOrdinal, maxOrdinal int32) {
if u == nil {
return
}
if u.nodes == nil {
u.nodes = make(map[string]esv1.NewNode)
}
for ord := minOrdinal - 1; ord < maxOrdinal; ord++ {
podName := sset.PodName(statefulSetName, ord)
newNode := u.nodes[podName]
newNode.Status = status
newNode.Message = ptr.To[string](message)
u.nodes[podName] = newNode
}
}
// HasPendingNewNodes returns true if at least one pending node creation has been reported.
func (u *UpscaleReporter) HasPendingNewNodes() bool {
if u == nil {
return false
}
return len(u.nodes) > 0
}
// Merge creates a new upscale status using the reported upscale status and an existing upscale status.
func (u *UpscaleReporter) Merge(other esv1.UpscaleOperation) esv1.UpscaleOperation {
upscaleOperation := other.DeepCopy()
if u == nil {
return *upscaleOperation
}
var nodes []esv1.NewNode
if len(u.nodes) != 0 {
nodes = make([]esv1.NewNode, 0, len(u.nodes))
for name, node := range u.nodes {
nodes = append(nodes, esv1.NewNode{
Name: name,
Status: node.Status,
Message: node.Message,
})
}
// Sort for stable comparison
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Name < nodes[j].Name
})
}
if (u.nodes != nil && !reflect.DeepEqual(nodes, other.Nodes)) || upscaleOperation.LastUpdatedTime.IsZero() {
upscaleOperation.Nodes = nodes
upscaleOperation.LastUpdatedTime = metav1.Now()
}
return *upscaleOperation
}
// -- Upgrade status
type UpgradeReporter struct {
// Expected nodes to be upgraded, key is node name
nodes map[string]esv1.UpgradedNode
}
// RecordNodesToBeUpgraded records in the status a list of nodes that should be upgraded.
func (u *UpgradeReporter) RecordNodesToBeUpgraded(nodes []string) {
u.RecordNodesToBeUpgradedWithMessage(nodes, "")
}
func (u *UpgradeReporter) recordNodesUpgrade(nodes []string, status string, message string) {
if u == nil {
return
}
if u.nodes == nil {
u.nodes = make(map[string]esv1.UpgradedNode, len(nodes))
}
for _, node := range nodes {
upgradedNode := u.nodes[node]
upgradedNode.Name = node
upgradedNode.Status = status
if len(message) > 0 {
upgradedNode.Message = ptr.To[string](message)
}
u.nodes[node] = upgradedNode
}
}
// RecordNodesToBeUpgradedWithMessage records in the status a list of nodes that should be upgraded
// with an additional message to give more information when relevant.
func (u *UpgradeReporter) RecordNodesToBeUpgradedWithMessage(nodes []string, message string) {
u.recordNodesUpgrade(nodes, "PENDING", message)
}
// RecordDeletedNode records a node being deleted for upgrade.
func (u *UpgradeReporter) RecordDeletedNode(node, message string) {
u.recordNodesUpgrade([]string{node}, "DELETED", message)
}
// RecordPredicatesResult records predicates results for a set of nodes.
func (u *UpgradeReporter) RecordPredicatesResult(predicatesResult map[string]string) {
if u == nil {
return
}
if u.nodes == nil {
u.nodes = make(map[string]esv1.UpgradedNode, len(predicatesResult))
}
for node, predicate := range predicatesResult {
upgradedNode := u.nodes[node]
upgradedNode.Name = node
upgradedNode.Predicate = ptr.To[string](predicate)
upgradedNode.Message = ptr.To[string]("Cannot restart node because of failed predicate")
u.nodes[node] = upgradedNode
}
}
// Merge creates a new upgrade status using the reported upgrade status and an existing upgrade status.
func (u *UpgradeReporter) Merge(other esv1.UpgradeOperation) esv1.UpgradeOperation {
upgradeOperation := other.DeepCopy()
if u == nil {
return *upgradeOperation
}
var nodes []esv1.UpgradedNode
if len(u.nodes) != 0 {
nodes = make([]esv1.UpgradedNode, 0, len(u.nodes))
for _, node := range u.nodes {
nodes = append(nodes, esv1.UpgradedNode{
Name: node.Name,
Predicate: node.Predicate,
Message: node.Message,
Status: node.Status,
})
}
// Sort for stable comparison
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Name < nodes[j].Name
})
}
if (u.nodes != nil && !reflect.DeepEqual(nodes, other.Nodes)) || upgradeOperation.LastUpdatedTime.IsZero() {
upgradeOperation.Nodes = nodes
upgradeOperation.LastUpdatedTime = metav1.Now()
}
return *upgradeOperation
}
// -- Downscale status
type DownscaleReporter struct {
// Expected nodes to be downscaled, key is node name
nodes map[string]esv1.DownscaledNode
stalled *bool
}
// RecordNodesToBeRemoved records nodes expected to be eventually removed from the cluster.
func (d *DownscaleReporter) RecordNodesToBeRemoved(nodes []string) {
if d == nil {
return
}
if d.nodes == nil {
d.nodes = make(map[string]esv1.DownscaledNode, len(nodes))
}
for _, node := range nodes {
d.nodes[node] = esv1.DownscaledNode{
Name: node,
// We set an initial value to let the caller know that this node should be eventually deleted.
// This should be overridden by the downscale algorithm.
ShutdownStatus: "NOT_STARTED",
}
}
}
// Merge creates a new downscale status using the reported downscale status and an existing downscale status.
func (d *DownscaleReporter) Merge(other esv1.DownscaleOperation) esv1.DownscaleOperation {
downscaleOperation := other.DeepCopy()
if d == nil {
return other
}
var nodes []esv1.DownscaledNode
if len(d.nodes) != 0 {
nodes = make([]esv1.DownscaledNode, 0, len(d.nodes))
for _, node := range d.nodes {
nodes = append(nodes, esv1.DownscaledNode{
Name: node.Name,
ShutdownStatus: node.ShutdownStatus,
Explanation: node.Explanation,
})
}
// Sort for stable comparison
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Name < nodes[j].Name
})
}
if (d.nodes != nil && !reflect.DeepEqual(nodes, other.Nodes)) || downscaleOperation.LastUpdatedTime.IsZero() {
downscaleOperation.Nodes = nodes
downscaleOperation.LastUpdatedTime = metav1.Now()
}
if !reflect.DeepEqual(d.stalled, other.Stalled) {
downscaleOperation.Stalled = d.stalled
downscaleOperation.LastUpdatedTime = metav1.Now()
}
return *downscaleOperation
}
func (d *DownscaleReporter) OnShutdownStatus(
podName string,
nodeShutdownStatus shutdown.NodeShutdownStatus,
) {
if d == nil {
return
}
if d.nodes == nil {
d.nodes = make(map[string]esv1.DownscaledNode)
}
node := d.nodes[podName]
node.Name = podName
node.ShutdownStatus = string(nodeShutdownStatus.Status)
if len(nodeShutdownStatus.Explanation) > 0 {
node.Explanation = ptr.To[string](nodeShutdownStatus.Explanation)
}
d.nodes[podName] = node
if nodeShutdownStatus.Status == esclient.ShutdownStalled {
d.stalled = ptr.To[bool](true)
}
}
func (d *DownscaleReporter) OnReconcileShutdowns(leavingNodes []string) {
if d == nil {
return
}
if d.nodes == nil {
d.nodes = make(map[string]esv1.DownscaledNode)
}
// Update InProgress condition and DownscaleOperation
for _, nodeName := range leavingNodes {
node := d.nodes[nodeName]
node.Name = nodeName
node.ShutdownStatus = string(esclient.ShutdownInProgress)
d.nodes[nodeName] = node
}
}