pkg/controller/elasticsearch/validation/validations.go (318 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package validation import ( "context" "fmt" "net" "strings" "k8s.io/apimachinery/pkg/util/validation/field" commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/license" stackmon "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/stackmon/validations" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" esversion "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" netutil "github.com/elastic/cloud-on-k8s/v3/pkg/utils/net" ) const ( cfgInvalidMsg = "Configuration invalid" duplicateNodeSets = "NodeSet names must be unique" invalidNamesErrMsg = "Elasticsearch configuration would generate resources with invalid names" invalidSanIPErrMsg = "Invalid SAN IP address. Must be a valid IPv4 address" masterRequiredMsg = "Elasticsearch needs to have at least one master node" mixedRoleConfigMsg = "Detected a combination of node.roles and %s. Use only node.roles" noDowngradesMsg = "Downgrades are not supported" nodeRolesInOldVersionMsg = "node.roles setting is not available in this version of Elasticsearch" parseStoredVersionErrMsg = "Cannot parse current Elasticsearch version. String format must be {major}.{minor}.{patch}[-{label}]" parseVersionErrMsg = "Cannot parse Elasticsearch version. String format must be {major}.{minor}.{patch}[-{label}]" pvcNotMountedErrMsg = "volume claim declared but volume not mounted in any container. Note that the Elasticsearch data volume should be named 'elasticsearch-data'" unsupportedConfigErrMsg = "Configuration setting is reserved for internal use. User-configured use is unsupported" unsupportedUpgradeMsg = "Unsupported version upgrade path. Check the Elasticsearch documentation for supported upgrade paths." unsupportedVersionMsg = "Unsupported version" notAllowedNodesLabelMsg = "Node label not in the exposed node labels list" unsupportedClientAuthenticationMsg = "Mandatory client authentication is not supported" autoscalingAnnotationUnsupportedErrMsg = "autoscaling annotation is no longer supported" ) type validation func(esv1.Elasticsearch) field.ErrorList type updateValidation func(esv1.Elasticsearch, esv1.Elasticsearch) field.ErrorList // updateValidations are the validation funcs that only apply to updates func updateValidations(ctx context.Context, k8sClient k8s.Client, validateStorageClass bool) []updateValidation { return []updateValidation{ noDowngrades, validUpgradePath, func(current esv1.Elasticsearch, proposed esv1.Elasticsearch) field.ErrorList { return validPVCModification(ctx, current, proposed, k8sClient, validateStorageClass) }, } } // validations are the validation funcs that apply to creates or updates func validations(ctx context.Context, checker license.Checker, exposedNodeLabels NodeLabels) []validation { return []validation{ func(proposed esv1.Elasticsearch) field.ErrorList { return validNodeLabels(proposed, exposedNodeLabels) }, noUnknownFields, validName, hasCorrectNodeRoles, supportedVersion, validSanIP, validAutoscalingConfiguration, validPVCNaming, validMonitoring, validAssociations, supportsRemoteClusterUsingAPIKey, func(proposed esv1.Elasticsearch) field.ErrorList { return validLicenseLevel(ctx, proposed, checker) }, } } func validNodeLabels(proposed esv1.Elasticsearch, exposedNodeLabels NodeLabels) field.ErrorList { var errs field.ErrorList for _, nodeLabel := range proposed.DownwardNodeLabels() { if exposedNodeLabels.IsAllowed(nodeLabel) { continue } errs = append( errs, field.Invalid( field.NewPath("metadata").Child("annotations", esv1.DownwardNodeLabelsAnnotation), nodeLabel, notAllowedNodesLabelMsg, ), ) } return errs } func check(es esv1.Elasticsearch, validations []validation) (string, field.ErrorList) { var errs field.ErrorList for _, val := range validations { if err := val(es); err != nil { errs = append(errs, err...) } } // check if Elasticsearch version is deprecated warnings, deprecatedErrors := commonv1.CheckDeprecatedStackVersion(es.Spec.Version) if len(deprecatedErrors) > 0 { errs = append(errs, deprecatedErrors...) } return warnings, errs } // noUnknownFields checks whether the last applied config annotation contains json with unknown fields. func noUnknownFields(es esv1.Elasticsearch) field.ErrorList { return commonv1.NoUnknownFields(&es, es.ObjectMeta) } // validName checks whether the name is valid. func validName(es esv1.Elasticsearch) field.ErrorList { var errs field.ErrorList if err := esv1.ValidateNames(es); err != nil { errs = append(errs, field.Invalid(field.NewPath("metadata").Child("name"), es.Name, fmt.Sprintf("%s: %s", invalidNamesErrMsg, err))) } return errs } func supportedVersion(es esv1.Elasticsearch) field.ErrorList { ver, err := version.Parse(es.Spec.Version) if err != nil { return field.ErrorList{field.Invalid(field.NewPath("spec").Child("version"), es.Spec.Version, parseVersionErrMsg)} } if v := esversion.SupportedVersions(ver); v != nil { if err := v.WithinRange(ver); err == nil { return field.ErrorList{} } } return field.ErrorList{field.Invalid(field.NewPath("spec").Child("version"), es.Spec.Version, unsupportedVersionMsg)} } func supportsRemoteClusterUsingAPIKey(es esv1.Elasticsearch) field.ErrorList { ver, err := version.Parse(es.Spec.Version) if err != nil { return field.ErrorList{field.Invalid(field.NewPath("spec").Child("version"), es.Spec.Version, parseVersionErrMsg)} } var errs field.ErrorList if es.Spec.RemoteClusterServer.Enabled && ver.LE(esv1.RemoteClusterAPIKeysMinVersion) { errs = append(errs, field.Invalid( field.NewPath("spec").Child("remoteClusterServer"), es.Spec.Version, fmt.Sprintf( "minimum required version for remote cluster server is %s but desired version is %s", esv1.RemoteClusterAPIKeysMinVersion, es.Spec.Version, ), )) } if es.HasRemoteClusterAPIKey() && ver.LE(esv1.RemoteClusterAPIKeysMinVersion) { errs = append(errs, field.Invalid( field.NewPath("spec").Child("remoteClusters").Child("*").Key("apiKey"), es.Spec.Version, fmt.Sprintf( "minimum required version for remote cluster using API keys is %s but desired version is %s", esv1.RemoteClusterAPIKeysMinVersion, es.Spec.Version, ), )) } return errs } // hasCorrectNodeRoles checks whether Elasticsearch node roles are correctly configured. // The rules are: // There must be at least one master node. // node.roles are only supported on Elasticsearch 7.9.0 and above func hasCorrectNodeRoles(es esv1.Elasticsearch) field.ErrorList { v, err := version.Parse(es.Spec.Version) if err != nil { return field.ErrorList{field.Invalid(field.NewPath("spec").Child("version"), es.Spec.Version, parseVersionErrMsg)} } seenMaster := false var errs field.ErrorList confField := func(index int) *field.Path { return field.NewPath("spec").Child("nodeSets").Index(index).Child("config") } for i, ns := range es.Spec.NodeSets { cfg := esv1.ElasticsearchSettings{} if err := esv1.UnpackConfig(ns.Config, v, &cfg); err != nil { errs = append(errs, field.Invalid(confField(i), ns.Config, cfgInvalidMsg)) continue } // check that node.roles is not used with an older Elasticsearch version if cfg.Node != nil && cfg.Node.Roles != nil && !v.GTE(version.From(7, 9, 0)) { errs = append(errs, field.Invalid(confField(i), ns.Config, nodeRolesInOldVersionMsg)) continue } // check that node.roles and node attributes are not mixed nodeRoleAttrs := getNodeRoleAttrs(cfg) if cfg.Node != nil && len(cfg.Node.Roles) > 0 && len(nodeRoleAttrs) > 0 { errs = append(errs, field.Forbidden(confField(i), fmt.Sprintf(mixedRoleConfigMsg, strings.Join(nodeRoleAttrs, ",")))) } // Check if this nodeSet has the master role. seenMaster = seenMaster || (cfg.Node.IsConfiguredWithRole(esv1.MasterRole) && !cfg.Node.IsConfiguredWithRole(esv1.VotingOnlyRole) && ns.Count > 0) } if !seenMaster { errs = append(errs, field.Required(field.NewPath("spec").Child("nodeSets"), masterRequiredMsg)) } return errs } func getNodeRoleAttrs(cfg esv1.ElasticsearchSettings) []string { var nodeRoleAttrs []string //nolint:nestif if cfg.Node != nil { if cfg.Node.Data != nil { nodeRoleAttrs = append(nodeRoleAttrs, esv1.NodeData) } if cfg.Node.Ingest != nil { nodeRoleAttrs = append(nodeRoleAttrs, esv1.NodeIngest) } if cfg.Node.Master != nil { nodeRoleAttrs = append(nodeRoleAttrs, esv1.NodeMaster) } if cfg.Node.ML != nil { nodeRoleAttrs = append(nodeRoleAttrs, esv1.NodeML) } if cfg.Node.RemoteClusterClient != nil { nodeRoleAttrs = append(nodeRoleAttrs, esv1.NodeRemoteClusterClient) } if cfg.Node.Transform != nil { nodeRoleAttrs = append(nodeRoleAttrs, esv1.NodeTransform) } if cfg.Node.VotingOnly != nil { nodeRoleAttrs = append(nodeRoleAttrs, esv1.NodeVotingOnly) } } return nodeRoleAttrs } func validSanIP(es esv1.Elasticsearch) field.ErrorList { var errs field.ErrorList selfSignedCerts := es.Spec.HTTP.TLS.SelfSignedCertificate if selfSignedCerts != nil { for _, san := range selfSignedCerts.SubjectAlternativeNames { if san.IP != "" { ip := netutil.IPToRFCForm(net.ParseIP(san.IP)) if ip == nil { errs = append(errs, field.Invalid(field.NewPath("spec").Child("http", "tls", "selfSignedCertificate", "subjectAlternativeNames"), san.IP, invalidSanIPErrMsg)) } } } } return errs } func checkNodeSetNameUniqueness(es esv1.Elasticsearch) field.ErrorList { var errs field.ErrorList nodeSets := es.Spec.NodeSets names := make(map[string]struct{}) duplicates := make(map[string]struct{}) for _, nodeSet := range nodeSets { if _, found := names[nodeSet.Name]; found { duplicates[nodeSet.Name] = struct{}{} } names[nodeSet.Name] = struct{}{} } for _, dupe := range duplicates { errs = append(errs, field.Invalid(field.NewPath("spec").Child("nodeSets"), dupe, duplicateNodeSets)) } return errs } func noDowngrades(current, proposed esv1.Elasticsearch) field.ErrorList { var errs field.ErrorList // allow disabling version validation if proposed.IsConfiguredToAllowDowngrades() { return errs } currentVer, err := version.Parse(current.Spec.Version) if err != nil { // this should not happen, since this is the already persisted version errs = append(errs, field.Invalid(field.NewPath("spec").Child("version"), current.Spec.Version, parseStoredVersionErrMsg)) } proposedVer, err := version.Parse(proposed.Spec.Version) if err != nil { errs = append(errs, field.Invalid(field.NewPath("spec").Child("version"), proposed.Spec.Version, parseVersionErrMsg)) } if len(errs) != 0 { return errs } if !proposedVer.GTE(currentVer) { errs = append(errs, field.Invalid(field.NewPath("spec").Child("version"), proposed.Spec.Version, noDowngradesMsg)) } return errs } func validUpgradePath(current, proposed esv1.Elasticsearch) field.ErrorList { var errs field.ErrorList currentVer, ferr := currentVersion(current) if ferr != nil { errs = append(errs, ferr) } proposedVer, err := version.Parse(proposed.Spec.Version) if err != nil { errs = append(errs, field.Invalid(field.NewPath("spec").Child("version"), proposed.Spec.Version, parseVersionErrMsg)) } if len(errs) != 0 { return errs } supportedVersions := esversion.SupportedVersions(proposedVer) if supportedVersions == nil { errs = append(errs, field.Invalid(field.NewPath("spec").Child("version"), proposed.Spec.Version, unsupportedVersionMsg)) return errs } err = supportedVersions.WithinRange(currentVer) if err != nil { errs = append(errs, field.Invalid(field.NewPath("spec").Child("version"), proposed.Spec.Version, unsupportedUpgradeMsg)) } return errs } func currentVersion(current esv1.Elasticsearch) (version.Version, *field.Error) { // we do not have a version in the status let's use the version in the current spec instead which will not reflect // actually running Pods but which is still better than no validation. if current.Status.Version == "" { currentVer, err := version.Parse(current.Spec.Version) if err != nil { // this should not happen, since this is the already persisted version return version.Version{}, field.Invalid(field.NewPath("spec").Child("version"), current.Spec.Version, parseStoredVersionErrMsg) } return currentVer, nil } // if available use the status version which reflects the lowest version currently running in the cluster currentVer, err := version.Parse(current.Status.Version) if err != nil { // this should not happen, since this is the version from the spec copied to the status by the operator return version.Version{}, field.Invalid(field.NewPath("status").Child("version"), current.Status.Version, parseStoredVersionErrMsg) } return currentVer, nil } func validMonitoring(es esv1.Elasticsearch) field.ErrorList { return stackmon.Validate(&es, es.Spec.Version, stackmon.MinStackVersion) } func validAssociations(es esv1.Elasticsearch) field.ErrorList { monitoringPath := field.NewPath("spec").Child("monitoring") err1 := commonv1.CheckAssociationRefs(monitoringPath.Child("metrics"), es.GetMonitoringMetricsRefs()...) err2 := commonv1.CheckAssociationRefs(monitoringPath.Child("logs"), es.GetMonitoringLogsRefs()...) return append(err1, err2...) } func validLicenseLevel(ctx context.Context, es esv1.Elasticsearch, checker license.Checker) field.ErrorList { var errs field.ErrorList ok, err := license.HasRequestedLicenseLevel(ctx, es.Annotations, checker) if err != nil { ulog.FromContext(ctx).Error(err, "while checking license level during validation") return nil // ignore the error here } if !ok { errs = append(errs, field.Invalid(field.NewPath("metadata").Child("annotations").Child(license.Annotation), "enterprise", "Enterprise license required but ECK operator is running on a Basic license")) } return errs }