apis/v1alpha1/collector_webhook.go (279 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package v1alpha1
import (
"context"
"fmt"
"github.com/go-logr/logr"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/config"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters"
ta "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/targetallocator/adapters"
"github.com/aws/amazon-cloudwatch-agent-operator/pkg/featuregate"
)
var (
_ admission.CustomValidator = &CollectorWebhook{}
_ admission.CustomDefaulter = &CollectorWebhook{}
)
// +kubebuilder:webhook:path=/mutate-cloudwatch-aws-amazon-com-v1alpha1-amazoncloudwatchagent,mutating=true,failurePolicy=fail,groups=cloudwatch.aws.amazon.com,resources=amazoncloudwatchagents,verbs=create;update,versions=v1alpha1,name=mamazoncloudwatchagent.kb.io,sideEffects=none,admissionReviewVersions=v1
// +kubebuilder:webhook:verbs=create;update,path=/validate-cloudwatch-aws-amazon-com-v1alpha1-amazoncloudwatchagent,mutating=false,failurePolicy=fail,groups=cloudwatch.aws.amazon.com,resources=amazoncloudwatchagents,versions=v1alpha1,name=vamazoncloudwatchagentcreateupdate.kb.io,sideEffects=none,admissionReviewVersions=v1
// +kubebuilder:webhook:verbs=delete,path=/validate-cloudwatch-aws-amazon-com-v1alpha1-amazoncloudwatchagent,mutating=false,failurePolicy=ignore,groups=cloudwatch.aws.amazon.com,resources=amazoncloudwatchagents,versions=v1alpha1,name=vamazoncloudwatchagentdelete.kb.io,sideEffects=none,admissionReviewVersions=v1
// +kubebuilder:object:generate=false
type CollectorWebhook struct {
logger logr.Logger
cfg config.Config
scheme *runtime.Scheme
}
func (c CollectorWebhook) Default(ctx context.Context, obj runtime.Object) error {
otelcol, ok := obj.(*AmazonCloudWatchAgent)
if !ok {
return fmt.Errorf("expected an AmazonCloudWatchAgent, received %T", obj)
}
return c.defaulter(otelcol)
}
func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
otelcol, ok := obj.(*AmazonCloudWatchAgent)
if !ok {
return nil, fmt.Errorf("expected an AmazonCloudWatchAgent, received %T", obj)
}
return c.validate(otelcol)
}
func (c CollectorWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) {
otelcol, ok := newObj.(*AmazonCloudWatchAgent)
if !ok {
return nil, fmt.Errorf("expected an AmazonCloudWatchAgent, received %T", newObj)
}
return c.validate(otelcol)
}
func (c CollectorWebhook) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
otelcol, ok := obj.(*AmazonCloudWatchAgent)
if !ok || otelcol == nil {
return nil, fmt.Errorf("expected an AmazonCloudWatchAgent, received %T", obj)
}
return c.validate(otelcol)
}
func (c CollectorWebhook) defaulter(r *AmazonCloudWatchAgent) error {
if len(r.Spec.Mode) == 0 {
r.Spec.Mode = ModeDeployment
}
if len(r.Spec.UpgradeStrategy) == 0 {
r.Spec.UpgradeStrategy = UpgradeStrategyAutomatic
}
if r.Labels == nil {
r.Labels = map[string]string{}
}
if r.Labels["app.kubernetes.io/managed-by"] == "" {
r.Labels["app.kubernetes.io/managed-by"] = "amazon-cloudwatch-agent-operator"
}
// We can default to one because dependent objects Deployment and HorizontalPodAutoScaler
// default to 1 as well.
one := int32(1)
if r.Spec.Replicas == nil {
r.Spec.Replicas = &one
}
if r.Spec.TargetAllocator.Enabled && r.Spec.TargetAllocator.Replicas == nil {
r.Spec.TargetAllocator.Replicas = &one
}
if r.Spec.MaxReplicas != nil || (r.Spec.Autoscaler != nil && r.Spec.Autoscaler.MaxReplicas != nil) {
if r.Spec.Autoscaler == nil {
r.Spec.Autoscaler = &AutoscalerSpec{}
}
if r.Spec.Autoscaler.MaxReplicas == nil {
r.Spec.Autoscaler.MaxReplicas = r.Spec.MaxReplicas
}
if r.Spec.Autoscaler.MinReplicas == nil {
if r.Spec.MinReplicas != nil {
r.Spec.Autoscaler.MinReplicas = r.Spec.MinReplicas
} else {
r.Spec.Autoscaler.MinReplicas = r.Spec.Replicas
}
}
if r.Spec.Autoscaler.TargetMemoryUtilization == nil && r.Spec.Autoscaler.TargetCPUUtilization == nil {
defaultCPUTarget := int32(90)
r.Spec.Autoscaler.TargetCPUUtilization = &defaultCPUTarget
}
}
// if pdb isn't provided, we set MaxUnavailable 1,
// which will work even if there is just one replica,
// not blocking node drains but preventing out-of-the-box
// from disruption generated by them with replicas > 1
if r.Spec.PodDisruptionBudget == nil {
r.Spec.PodDisruptionBudget = &PodDisruptionBudgetSpec{
MaxUnavailable: &intstr.IntOrString{
Type: intstr.Int,
IntVal: 1,
},
}
}
if r.Spec.Ingress.Type == IngressTypeRoute && r.Spec.Ingress.Route.Termination == "" {
r.Spec.Ingress.Route.Termination = TLSRouteTerminationTypeEdge
}
if r.Spec.Ingress.Type == IngressTypeNginx && r.Spec.Ingress.RuleType == "" {
r.Spec.Ingress.RuleType = IngressRuleTypePath
}
// If someone upgrades to a later version without upgrading their CRD they will not have a management state set.
// This results in a default state of unmanaged preventing reconciliation from continuing.
if len(r.Spec.ManagementState) == 0 {
r.Spec.ManagementState = ManagementStateManaged
}
return nil
}
func (c CollectorWebhook) validate(r *AmazonCloudWatchAgent) (admission.Warnings, error) {
warnings := admission.Warnings{}
// validate volumeClaimTemplates
if r.Spec.Mode != ModeStatefulSet && len(r.Spec.VolumeClaimTemplates) > 0 {
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the attribute 'volumeClaimTemplates'", r.Spec.Mode)
}
// validate tolerations
if r.Spec.Mode == ModeSidecar && len(r.Spec.Tolerations) > 0 {
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the attribute 'tolerations'", r.Spec.Mode)
}
// validate priorityClassName
if r.Spec.Mode == ModeSidecar && r.Spec.PriorityClassName != "" {
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the attribute 'priorityClassName'", r.Spec.Mode)
}
// validate affinity
if r.Spec.Mode == ModeSidecar && r.Spec.Affinity != nil {
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the attribute 'affinity'", r.Spec.Mode)
}
if r.Spec.Mode == ModeSidecar && len(r.Spec.AdditionalContainers) > 0 {
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the attribute 'AdditionalContainers'", r.Spec.Mode)
}
// validate target allocation
if r.Spec.TargetAllocator.Enabled && r.Spec.Mode != ModeStatefulSet {
warnings = append(warnings, fmt.Sprintf("The Amazon CloudWatch Agent mode is set to %s, we do not recommend enabling Target Allocator when not running as a StatefulSet", r.Spec.Mode))
}
// validate Prometheus config for target allocation
if r.Spec.TargetAllocator.Enabled {
promConfigYaml, err := r.Spec.Prometheus.Yaml()
if err != nil {
return warnings, fmt.Errorf("%s could not convert json to yaml", err)
}
promCfg, err := adapters.ConfigFromString(promConfigYaml)
if err != nil {
return warnings, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err)
}
err = ta.ValidatePromConfig(promCfg, r.Spec.TargetAllocator.Enabled, featuregate.EnableTargetAllocatorRewrite.IsEnabled())
if err != nil {
return warnings, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err)
}
err = ta.ValidateTargetAllocatorConfig(r.Spec.TargetAllocator.PrometheusCR.Enabled, promCfg)
if err != nil {
return warnings, fmt.Errorf("the OpenTelemetry Spec Prometheus configuration is incorrect, %w", err)
}
}
// validator port config
for _, p := range r.Spec.Ports {
nameErrs := validation.IsValidPortName(p.Name)
numErrs := validation.IsValidPortNum(int(p.Port))
if len(nameErrs) > 0 || len(numErrs) > 0 {
return warnings, fmt.Errorf("the OpenTelemetry Spec Ports configuration is incorrect, port name '%s' errors: %s, num '%d' errors: %s",
p.Name, nameErrs, p.Port, numErrs)
}
}
var maxReplicas *int32
if r.Spec.Autoscaler != nil && r.Spec.Autoscaler.MaxReplicas != nil {
maxReplicas = r.Spec.Autoscaler.MaxReplicas
}
// check deprecated .Spec.MaxReplicas if maxReplicas is not set
if maxReplicas == nil && r.Spec.MaxReplicas != nil {
warnings = append(warnings, "MaxReplicas is deprecated")
maxReplicas = r.Spec.MaxReplicas
}
var minReplicas *int32
if r.Spec.Autoscaler != nil && r.Spec.Autoscaler.MinReplicas != nil {
minReplicas = r.Spec.Autoscaler.MinReplicas
}
// check deprecated .Spec.MinReplicas if minReplicas is not set
if minReplicas == nil {
if r.Spec.MinReplicas != nil {
warnings = append(warnings, "MinReplicas is deprecated")
minReplicas = r.Spec.MinReplicas
} else {
minReplicas = r.Spec.Replicas
}
}
if r.Spec.Ingress.Type == IngressTypeNginx && r.Spec.Mode == ModeSidecar {
return warnings, fmt.Errorf("the OpenTelemetry Spec Ingress configuration is incorrect. Ingress can only be used in combination with the modes: %s, %s, %s",
ModeDeployment, ModeDaemonSet, ModeStatefulSet,
)
}
// validate autoscale with horizontal pod autoscaler
if maxReplicas != nil {
if *maxReplicas < int32(1) {
return warnings, fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, maxReplicas should be defined and one or more")
}
if r.Spec.Replicas != nil && *r.Spec.Replicas > *maxReplicas {
return warnings, fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, replicas must not be greater than maxReplicas")
}
if minReplicas != nil && *minReplicas > *maxReplicas {
return warnings, fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, minReplicas must not be greater than maxReplicas")
}
if minReplicas != nil && *minReplicas < int32(1) {
return warnings, fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, minReplicas should be one or more")
}
if r.Spec.Autoscaler != nil {
return warnings, checkAutoscalerSpec(r.Spec.Autoscaler)
}
}
if r.Spec.Ingress.Type == IngressTypeNginx && r.Spec.Mode == ModeSidecar {
return warnings, fmt.Errorf("the OpenTelemetry Spec Ingress configuiration is incorrect. Ingress can only be used in combination with the modes: %s, %s, %s",
ModeDeployment, ModeDaemonSet, ModeStatefulSet,
)
}
if r.Spec.Ingress.RuleType == IngressRuleTypeSubdomain && (r.Spec.Ingress.Hostname == "" || r.Spec.Ingress.Hostname == "*") {
return warnings, fmt.Errorf("a valid Ingress hostname has to be defined for subdomain ruleType")
}
if r.Spec.LivenessProbe != nil {
if r.Spec.LivenessProbe.InitialDelaySeconds != nil && *r.Spec.LivenessProbe.InitialDelaySeconds < 0 {
return warnings, fmt.Errorf("the OpenTelemetry Spec LivenessProbe InitialDelaySeconds configuration is incorrect. InitialDelaySeconds should be greater than or equal to 0")
}
if r.Spec.LivenessProbe.PeriodSeconds != nil && *r.Spec.LivenessProbe.PeriodSeconds < 1 {
return warnings, fmt.Errorf("the OpenTelemetry Spec LivenessProbe PeriodSeconds configuration is incorrect. PeriodSeconds should be greater than or equal to 1")
}
if r.Spec.LivenessProbe.TimeoutSeconds != nil && *r.Spec.LivenessProbe.TimeoutSeconds < 1 {
return warnings, fmt.Errorf("the OpenTelemetry Spec LivenessProbe TimeoutSeconds configuration is incorrect. TimeoutSeconds should be greater than or equal to 1")
}
if r.Spec.LivenessProbe.SuccessThreshold != nil && *r.Spec.LivenessProbe.SuccessThreshold < 1 {
return warnings, fmt.Errorf("the OpenTelemetry Spec LivenessProbe SuccessThreshold configuration is incorrect. SuccessThreshold should be greater than or equal to 1")
}
if r.Spec.LivenessProbe.FailureThreshold != nil && *r.Spec.LivenessProbe.FailureThreshold < 1 {
return warnings, fmt.Errorf("the OpenTelemetry Spec LivenessProbe FailureThreshold configuration is incorrect. FailureThreshold should be greater than or equal to 1")
}
if r.Spec.LivenessProbe.TerminationGracePeriodSeconds != nil && *r.Spec.LivenessProbe.TerminationGracePeriodSeconds < 1 {
return warnings, fmt.Errorf("the OpenTelemetry Spec LivenessProbe TerminationGracePeriodSeconds configuration is incorrect. TerminationGracePeriodSeconds should be greater than or equal to 1")
}
}
// validate updateStrategy for DaemonSet
if r.Spec.Mode != ModeDaemonSet && len(r.Spec.UpdateStrategy.Type) > 0 {
return warnings, fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the attribute 'updateStrategy'", r.Spec.Mode)
}
return warnings, nil
}
func checkAutoscalerSpec(autoscaler *AutoscalerSpec) error {
if autoscaler.Behavior != nil {
if autoscaler.Behavior.ScaleDown != nil && autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds != nil &&
*autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds < int32(1) {
return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, scaleDown should be one or more")
}
if autoscaler.Behavior.ScaleUp != nil && autoscaler.Behavior.ScaleUp.StabilizationWindowSeconds != nil &&
*autoscaler.Behavior.ScaleUp.StabilizationWindowSeconds < int32(1) {
return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, scaleUp should be one or more")
}
}
if autoscaler.TargetCPUUtilization != nil && (*autoscaler.TargetCPUUtilization < int32(1) || *autoscaler.TargetCPUUtilization > int32(99)) {
return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, targetCPUUtilization should be greater than 0 and less than 100")
}
if autoscaler.TargetMemoryUtilization != nil && (*autoscaler.TargetMemoryUtilization < int32(1) || *autoscaler.TargetMemoryUtilization > int32(99)) {
return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, targetMemoryUtilization should be greater than 0 and less than 100")
}
for _, metric := range autoscaler.Metrics {
if metric.Type != autoscalingv2.PodsMetricSourceType {
return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, metric type unsupported. Expected metric of source type Pod")
}
// pod metrics target only support value and averageValue.
if metric.Pods.Target.Type == autoscalingv2.AverageValueMetricType {
if val, ok := metric.Pods.Target.AverageValue.AsInt64(); !ok || val < int64(1) {
return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, average value should be greater than 0")
}
} else if metric.Pods.Target.Type == autoscalingv2.ValueMetricType {
if val, ok := metric.Pods.Target.Value.AsInt64(); !ok || val < int64(1) {
return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, value should be greater than 0")
}
} else {
return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, invalid pods target type")
}
}
return nil
}
func SetupCollectorWebhook(mgr ctrl.Manager, cfg config.Config) error {
cvw := &CollectorWebhook{
logger: mgr.GetLogger().WithValues("handler", "CollectorWebhook"),
scheme: mgr.GetScheme(),
cfg: cfg,
}
return ctrl.NewWebhookManagedBy(mgr).
For(&AmazonCloudWatchAgent{}).
WithValidator(cvw).
WithDefaulter(cvw).
Complete()
}