pkg/apis/common/v1alpha1/autoscaling_status.go (269 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package v1alpha1
import (
"fmt"
"strings"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/set"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/stringsutil"
)
const (
CPURequired AutoscalingEventType = "CPURequired"
EmptyResponse AutoscalingEventType = "EmptyResponse"
HorizontalScalingLimitReached AutoscalingEventType = "HorizontalScalingLimitReached"
MemoryRequired AutoscalingEventType = "MemoryRequired"
NoNodeSet AutoscalingEventType = "NoNodeSet"
OverlappingPolicies AutoscalingEventType = "OverlappingPolicies"
StorageRequired AutoscalingEventType = "StorageRequired"
UnexpectedNodeStorageCapacity AutoscalingEventType = "UnexpectedNodeStorageCapacity"
VerticalScalingLimitReached AutoscalingEventType = "VerticalScalingLimitReached"
// NotOnlineErrorMessage is a generic error message to be used in Conditions. It's used to let the user know that
// the Elasticsearch autoscaling API was not used in the last reconciliation attempt.
NotOnlineErrorMessage string = "An error prevented resource calculation from the Elasticsearch autoscaling API"
)
const (
// ElasticsearchAutoscalerActive status is True when the ElasticsearchAutoscaler resource is managed by the operator and the target
// Elasticsearch cluster does exist. It makes it possible to attempt to calculate the required compute and storage resources
// for the targeted cluster.
ElasticsearchAutoscalerActive ConditionType = "Active"
// ElasticsearchAutoscalerHealthy status is True if resources have been calculated for all the autoscaling policies
// and no error has been encountered during the reconciliation process.
// The fact that this condition is False does not necessarily imply that the calculation of resources has failed
// for all the tiers.
ElasticsearchAutoscalerHealthy ConditionType = "Healthy"
// ElasticsearchAutoscalerLimited status is True when a resource limit is reached.
ElasticsearchAutoscalerLimited ConditionType = "Limited"
// ElasticsearchAutoscalerOnline status is True if the Elasticsearch API is available.
// For example, it is expected for this condition to be False if the cluster is being bootstrapped, however it should
// become True when the operator is able to connect to Elasticsearch.
ElasticsearchAutoscalerOnline ConditionType = "Online"
)
type ElasticsearchAutoscalerStatus struct {
// ObservedGeneration is the last observed generation by the controller.
ObservedGeneration *int64 `json:"observedGeneration,omitempty"`
// Conditions holds the current service state of the autoscaling controller.
// +kubebuilder:validation:Optional
Conditions Conditions `json:"conditions,omitempty"`
// AutoscalingPolicyStatuses is used to expose state messages to user or external system.
// +kubebuilder:validation:Optional
AutoscalingPolicyStatuses []AutoscalingPolicyStatus `json:"policies"`
}
type AutoscalingPolicyStatus struct {
// Name is the name of the autoscaling policy
Name string `json:"name"`
// NodeSetNodeCount holds the number of nodes for each nodeSet.
// +kubebuilder:validation:Optional
NodeSetNodeCount NodeSetNodeCountList `json:"nodeSets"`
// ResourcesSpecification holds the resource values common to all the nodeSets managed by a same autoscaling policy.
// Only the resources managed by the autoscaling controller are saved in the Status.
// +kubebuilder:validation:Optional
ResourcesSpecification NodeResources `json:"resources"`
// PolicyStates may contain various messages regarding the current state of this autoscaling policy.
// +kubebuilder:validation:Optional
PolicyStates []PolicyState `json:"state"`
// LastModificationTime is the last time the resources have been updated, used by the cooldown algorithm.
// +kubebuilder:validation:Optional
LastModificationTime metav1.Time `json:"lastModificationTime"`
}
func (s *ElasticsearchAutoscalerStatus) CurrentResourcesForPolicy(policyName string) (NodeSetsResources, bool) {
for _, policyStatus := range s.AutoscalingPolicyStatuses {
if policyStatus.Name == policyName {
return NodeSetsResources{
Name: policyStatus.Name,
NodeSetNodeCount: policyStatus.NodeSetNodeCount,
NodeResources: policyStatus.ResourcesSpecification,
}, true
}
}
return NodeSetsResources{}, false
}
func (s *ElasticsearchAutoscalerStatus) LastModificationTime(policyName string) (metav1.Time, bool) {
for _, policyState := range s.AutoscalingPolicyStatuses {
if policyState.Name == policyName {
return policyState.LastModificationTime, true
}
}
return metav1.Time{}, false
}
// +kubebuilder:object:generate=false
type AutoscalingPolicyStatusBuilder struct {
policyName string
nodeSetsResources NodeSetsResources
lastModificationTime metav1.Time
states map[AutoscalingEventType]PolicyState
}
func NewAutoscalingPolicyStatusBuilder(name string) *AutoscalingPolicyStatusBuilder {
return &AutoscalingPolicyStatusBuilder{
policyName: name,
states: make(map[AutoscalingEventType]PolicyState),
}
}
func (psb *AutoscalingPolicyStatusBuilder) Build() AutoscalingPolicyStatus {
policyStates := make([]PolicyState, len(psb.states))
i := 0
for _, v := range psb.states {
policyStates[i] = PolicyState{
Type: v.Type,
Messages: v.Messages,
}
i++
}
return AutoscalingPolicyStatus{
Name: psb.policyName,
NodeSetNodeCount: psb.nodeSetsResources.NodeSetNodeCount,
ResourcesSpecification: psb.nodeSetsResources.NodeResources,
LastModificationTime: psb.lastModificationTime,
PolicyStates: policyStates,
}
}
// SetNodeSetsResources sets the compute resources associated to a tier.
func (psb *AutoscalingPolicyStatusBuilder) SetNodeSetsResources(nodeSetsResources NodeSetsResources) *AutoscalingPolicyStatusBuilder {
psb.nodeSetsResources = nodeSetsResources
return psb
}
func (psb *AutoscalingPolicyStatusBuilder) SetLastModificationTime(lastModificationTime metav1.Time) *AutoscalingPolicyStatusBuilder {
psb.lastModificationTime = lastModificationTime
return psb
}
// RecordEvent records a new event (type + message) for the tier.
func (psb *AutoscalingPolicyStatusBuilder) RecordEvent(stateType AutoscalingEventType, message string) *AutoscalingPolicyStatusBuilder {
if policyState, ok := psb.states[stateType]; ok {
policyState.Messages = append(policyState.Messages, message)
psb.states[stateType] = policyState
return psb
}
psb.states[stateType] = PolicyState{
Type: stateType,
Messages: []string{message},
}
return psb
}
type AutoscalingEventType string
type PolicyState struct {
Type AutoscalingEventType `json:"type"`
Messages []string `json:"messages"`
}
// +kubebuilder:object:generate=false
type AutoscalingStatusBuilder struct {
// Surface specific autoscaling events
scalingLimitEvents set.StringSet
nonScalingLimitEvents set.StringSet
// Online/Offline status
online *bool
onlineMessage string
// Policies statuses
policyStatusBuilder map[string]*AutoscalingPolicyStatusBuilder
}
// NewAutoscalingStatusBuilder creates a new autoscaling status builder.
func NewAutoscalingStatusBuilder() *AutoscalingStatusBuilder {
return &AutoscalingStatusBuilder{
scalingLimitEvents: set.Make(),
nonScalingLimitEvents: set.Make(),
policyStatusBuilder: make(map[string]*AutoscalingPolicyStatusBuilder),
}
}
func (asb *AutoscalingStatusBuilder) SetOnline(isOnline bool, message string) *AutoscalingStatusBuilder {
asb.online = &isOnline
if len(message) < 256 {
asb.onlineMessage = message
return asb
}
// arbitrarily truncate the message to avoid a full stacktrace in the resource status
asb.onlineMessage = stringsutil.Truncate(message, 256) + "[...]"
return asb
}
// UpdateResources sets or updates compute resources associated to all the tiers.
func (asb *AutoscalingStatusBuilder) UpdateResources(
nextClusterResources ClusterResources,
currentAutoscalingStatus ElasticsearchAutoscalerStatus,
) *AutoscalingStatusBuilder {
// Update the timestamp on tiers resources
now := metav1.Now()
for _, nextNodeSetResources := range nextClusterResources {
// Save the resources in the status
asb.ForPolicy(nextNodeSetResources.Name).SetNodeSetsResources(nextNodeSetResources)
// Restore the previous timestamp
previousTimestamp, ok := currentAutoscalingStatus.LastModificationTime(nextNodeSetResources.Name)
if ok {
asb.ForPolicy(nextNodeSetResources.Name).SetLastModificationTime(previousTimestamp)
}
currentNodeSetResources, ok := currentAutoscalingStatus.CurrentResourcesForPolicy(nextNodeSetResources.Name)
if !ok || !currentNodeSetResources.SameResources(nextNodeSetResources) {
asb.ForPolicy(nextNodeSetResources.Name).SetLastModificationTime(now)
}
}
return asb
}
func (asb *AutoscalingStatusBuilder) ForPolicy(policyName string) *AutoscalingPolicyStatusBuilder {
if value, ok := asb.policyStatusBuilder[policyName]; ok {
return value
}
policyStatusBuilder := NewAutoscalingPolicyStatusBuilder(policyName)
asb.policyStatusBuilder[policyName] = policyStatusBuilder
return policyStatusBuilder
}
func (asb *AutoscalingStatusBuilder) Build() ElasticsearchAutoscalerStatus {
policyStates := make([]AutoscalingPolicyStatus, len(asb.policyStatusBuilder))
i := 0
for _, policyStateBuilder := range asb.policyStatusBuilder {
for eventType := range policyStateBuilder.states {
if eventType == VerticalScalingLimitReached || eventType == HorizontalScalingLimitReached {
asb.scalingLimitEvents.Add(policyStateBuilder.policyName)
} else {
asb.nonScalingLimitEvents.Add(policyStateBuilder.policyName)
}
}
policyStates[i] = policyStateBuilder.Build()
i++
}
now := metav1.Now()
var conditions Conditions
// Update the ElasticsearchAutoscalerLimited condition.
if asb.scalingLimitEvents.Count() > 0 {
conditions = conditions.MergeWith(
Condition{
Type: ElasticsearchAutoscalerLimited,
Status: corev1.ConditionTrue,
LastTransitionTime: now,
Message: fmt.Sprintf("Limit reached for policies %s", strings.Join(asb.scalingLimitEvents.AsSortedSlice(), ",")),
})
} else {
conditions = conditions.MergeWith(
Condition{
Type: ElasticsearchAutoscalerLimited,
Status: corev1.ConditionFalse,
LastTransitionTime: now,
})
}
// Update the ElasticsearchAutoscalerHealthy condition if there is any other event to report.
if asb.nonScalingLimitEvents.Count() > 0 {
conditions = conditions.MergeWith(
Condition{
Type: ElasticsearchAutoscalerHealthy,
Status: corev1.ConditionFalse,
LastTransitionTime: now,
Message: fmt.Sprintf(
"Issues reported for the following policies: [%s]. Check operator logs, Kubernetes events, and policies status for more details",
strings.Join(asb.nonScalingLimitEvents.AsSortedSlice(), ","),
),
})
} else {
conditions = conditions.MergeWith(
Condition{
Type: ElasticsearchAutoscalerHealthy,
Status: corev1.ConditionTrue,
LastTransitionTime: now,
})
}
// Set active status
conditions = conditions.MergeWith(
Condition{
Type: ElasticsearchAutoscalerActive,
Status: corev1.ConditionTrue,
LastTransitionTime: now,
},
)
// Set online status
switch {
case asb.online == nil:
// Unlikely to happen since online status should be set early by the driver.
conditions = conditions.MergeWith(
Condition{
Type: ElasticsearchAutoscalerOnline,
Status: corev1.ConditionUnknown,
LastTransitionTime: now,
},
)
case *asb.online:
// The operator attempted a call to the Elasticsearch API
conditions = conditions.MergeWith(
Condition{
Type: ElasticsearchAutoscalerOnline,
Status: corev1.ConditionTrue,
LastTransitionTime: now,
Message: asb.onlineMessage,
},
)
default:
// The operator did not attempt a call to the Elasticsearch API, or the call failed.
conditions = conditions.MergeWith(
Condition{
Type: ElasticsearchAutoscalerOnline,
Status: corev1.ConditionFalse,
LastTransitionTime: now,
Message: asb.onlineMessage,
},
)
}
if asb.online == nil || !*asb.online {
// Also update the healthy condition if not online
var healthyCondition Condition
healthyConditionIdx := conditions.Index(ElasticsearchAutoscalerHealthy)
if healthyConditionIdx > 0 {
healthyCondition = conditions[healthyConditionIdx]
}
conditions = conditions.MergeWith(
Condition{
Type: ElasticsearchAutoscalerHealthy,
Status: corev1.ConditionFalse,
LastTransitionTime: now,
Message: strings.TrimSpace(fmt.Sprintf("%s. %s", NotOnlineErrorMessage, healthyCondition.Message)),
},
)
}
return ElasticsearchAutoscalerStatus{
Conditions: conditions,
AutoscalingPolicyStatuses: policyStates,
}
}