internal/kubelet/config.go (463 lines of code) (raw):

package kubelet import ( "context" _ "embed" "encoding/json" "fmt" "io" "math" "net" "net/url" "os" "path" "path/filepath" "regexp" "strings" "time" "dario.cat/mergo" "github.com/aws/aws-sdk-go-v2/feature/ec2/imds" "github.com/aws/smithy-go/ptr" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/mod/semver" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8skubelet "k8s.io/kubelet/config/v1beta1" "sigs.k8s.io/yaml" "github.com/aws/eks-hybrid/internal/api" "github.com/aws/eks-hybrid/internal/containerd" "github.com/aws/eks-hybrid/internal/system" "github.com/aws/eks-hybrid/internal/util" ) const ( kubeletConfigRoot = "/etc/kubernetes/kubelet" kubeletConfigFile = "config.json" kubeletConfigDir = "config.json.d" kubeletConfigPerm = 0o644 hybridNodeLabel = "eks.amazonaws.com/compute-type=hybrid" credentialProviderLabelKey = "eks.amazonaws.com/hybrid-credential-provider" hybridProviderIdPrefix = "eks-hybrid" ) var nodeNameProviderIdRegexPattern = regexp.MustCompile(`^eks-hybrid:///[^/]+/[^/]+/(.+)$`) func (k *kubelet) writeKubeletConfig() error { kubeletVersion, err := GetKubeletVersion() if err != nil { return err } // tracking: https://github.com/kubernetes/enhancements/issues/3983 // for enabling drop-in configuration if semver.Compare(kubeletVersion, "v1.29.0") < 0 { return k.writeKubeletConfigToFile() } else { return k.writeKubeletConfigToDir() } } // kubeletConfig is an internal-only representation of the kubelet configuration // that is generated using sane defaults for EKS. It is a subset of the upstream // KubeletConfiguration types: // https://pkg.go.dev/k8s.io/kubelet/config/v1beta1#KubeletConfiguration type kubeletConfig struct { Address string `json:"address"` Authentication k8skubelet.KubeletAuthentication `json:"authentication"` Authorization k8skubelet.KubeletAuthorization `json:"authorization"` CgroupDriver string `json:"cgroupDriver"` CgroupRoot string `json:"cgroupRoot"` ClusterDNS []string `json:"clusterDNS"` ClusterDomain string `json:"clusterDomain"` ContainerRuntimeEndpoint string `json:"containerRuntimeEndpoint"` EvictionHard map[string]string `json:"evictionHard,omitempty"` FeatureGates map[string]bool `json:"featureGates"` HairpinMode string `json:"hairpinMode"` KubeAPIBurst *int `json:"kubeAPIBurst,omitempty"` KubeAPIQPS *int `json:"kubeAPIQPS,omitempty"` KubeReserved map[string]string `json:"kubeReserved,omitempty"` KubeReservedCgroup *string `json:"kubeReservedCgroup,omitempty"` Logging loggingConfiguration `json:"logging"` MaxPods int32 `json:"maxPods,omitempty"` ProtectKernelDefaults bool `json:"protectKernelDefaults"` ProviderID *string `json:"providerID,omitempty"` ReadOnlyPort int `json:"readOnlyPort"` RegisterWithTaints []v1.Taint `json:"registerWithTaints,omitempty"` SerializeImagePulls bool `json:"serializeImagePulls"` ServerTLSBootstrap bool `json:"serverTLSBootstrap"` SystemReservedCgroup *string `json:"systemReservedCgroup,omitempty"` TLSCipherSuites []string `json:"tlsCipherSuites"` ResolvConf string `json:"resolvConf,omitempty"` metav1.TypeMeta `json:",inline"` } type loggingConfiguration struct { Verbosity int `json:"verbosity"` } // Creates an internal kubelet configuration from the public facing bootstrap // kubelet configuration with additional sane defaults. func defaultKubeletSubConfig() kubeletConfig { return kubeletConfig{ TypeMeta: metav1.TypeMeta{ Kind: "KubeletConfiguration", APIVersion: "kubelet.config.k8s.io/v1beta1", }, Address: "0.0.0.0", Authentication: k8skubelet.KubeletAuthentication{ Anonymous: k8skubelet.KubeletAnonymousAuthentication{ Enabled: ptr.Bool(false), }, Webhook: k8skubelet.KubeletWebhookAuthentication{ Enabled: ptr.Bool(true), CacheTTL: metav1.Duration{Duration: time.Minute * 2}, }, X509: k8skubelet.KubeletX509Authentication{ ClientCAFile: caCertificatePath, }, }, Authorization: k8skubelet.KubeletAuthorization{ Mode: "Webhook", Webhook: k8skubelet.KubeletWebhookAuthorization{ CacheAuthorizedTTL: metav1.Duration{Duration: time.Minute * 5}, CacheUnauthorizedTTL: metav1.Duration{Duration: time.Second * 30}, }, }, CgroupDriver: "systemd", CgroupRoot: "/", ClusterDomain: "cluster.local", ContainerRuntimeEndpoint: containerd.ContainerRuntimeEndpoint, EvictionHard: map[string]string{ "memory.available": "100Mi", "nodefs.available": "10%", "nodefs.inodesFree": "5%", }, FeatureGates: map[string]bool{ "RotateKubeletServerCertificate": true, }, HairpinMode: "hairpin-veth", ProtectKernelDefaults: true, ReadOnlyPort: 0, Logging: loggingConfiguration{ Verbosity: 2, }, SerializeImagePulls: false, ServerTLSBootstrap: true, TLSCipherSuites: []string{ "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305", "TLS_RSA_WITH_AES_128_GCM_SHA256", "TLS_RSA_WITH_AES_256_GCM_SHA384", }, } } // Update the ClusterDNS of the internal kubelet config using a heuristic based // on the cluster service IP CIDR address. func (ksc *kubeletConfig) withFallbackClusterDns(cluster *api.ClusterDetails) error { clusterDns, err := cluster.GetClusterDns() if err != nil { return err } ksc.ClusterDNS = []string{clusterDns} return nil } // To support worker nodes to continue to communicate and connect to local cluster even when the Outpost // is disconnected from the parent AWS Region, the following specific setup are required: // - append entries to /etc/hosts with the mappings of control plane host IP address and API server // domain name. So that the domain name can be resolved to IP addresses locally. // - use aws-iam-authenticator as bootstrap auth for kubelet TLS bootstrapping which downloads client // X.509 certificate and generate kubelet kubeconfig file which uses the client cert. So that the // worker node can be authentiacated through X.509 certificate which works for both connected and // disconnected state. func (ksc *kubeletConfig) withOutpostSetup(cfg *api.NodeConfig) error { if enabled := cfg.Spec.Cluster.EnableOutpost; enabled != nil && *enabled { zap.L().Info("Setting up outpost..") if cfg.Spec.Cluster.ID == "" { return fmt.Errorf("clusterId cannot be empty when outpost is enabled.") } apiUrl, err := url.Parse(cfg.Spec.Cluster.APIServerEndpoint) if err != nil { return err } // TODO: cleanup ipAddresses, err := net.LookupHost(apiUrl.Host) if err != nil { return err } var ipHostMappings []string for _, ip := range ipAddresses { ipHostMappings = append(ipHostMappings, fmt.Sprintf("%s\t%s", ip, apiUrl.Host)) } output := strings.Join(ipHostMappings, "\n") + "\n" // append to /etc/hosts file with shuffled mappings of "IP address to API server domain name" f, err := os.OpenFile("/etc/hosts", os.O_APPEND|os.O_WRONLY, kubeletConfigPerm) if err != nil { return err } defer f.Close() if _, err := f.WriteString(output); err != nil { return err } } return nil } func (ksc *kubeletConfig) withNodeIp(cfg *api.NodeConfig, flags map[string]string) error { nodeIp, err := getNodeIp(context.TODO(), imds.New(imds.Options{}), cfg) if err != nil { return err } flags["node-ip"] = nodeIp zap.L().Info("Setup IP for node", zap.String("ip", nodeIp)) return nil } func (ksc *kubeletConfig) withResolvConf(resolvConfPath string) { ksc.ResolvConf = resolvConfPath } func (ksc *kubeletConfig) withVersionToggles(kubeletVersion string, flags map[string]string) { // TODO: remove when 1.26 is EOL if semver.Compare(kubeletVersion, "v1.27.0") < 0 { // --container-runtime flag is gone in 1.27+ flags["container-runtime"] = "remote" // --container-runtime-endpoint moved to kubelet config start from 1.27 // https://github.com/kubernetes/kubernetes/blob/master/CHANGELOG/CHANGELOG-1.27.md?plain=1#L1800-L1801 flags["container-runtime-endpoint"] = ksc.ContainerRuntimeEndpoint } // TODO: Remove this during 1.27 EOL // Enable Feature Gate for KubeletCredentialProviders in versions less than 1.28 since this feature flag was removed in 1.28. if semver.Compare(kubeletVersion, "v1.28.0") < 0 { ksc.FeatureGates["KubeletCredentialProviders"] = true } // for K8s versions that suport API Priority & Fairness, increase our API server QPS // in 1.27, the default is already increased to 50/100, so use the higher defaults if semver.Compare(kubeletVersion, "v1.22.0") >= 0 && semver.Compare(kubeletVersion, "v1.27.0") < 0 { ksc.KubeAPIQPS = ptr.Int(10) ksc.KubeAPIBurst = ptr.Int(20) } } func (ksc *kubeletConfig) withCloudProvider(kubeletVersion string, cfg *api.NodeConfig, flags map[string]string) { if semver.Compare(kubeletVersion, "v1.26.0") >= 0 { // ref: https://github.com/kubernetes/kubernetes/pull/121367 flags["cloud-provider"] = "external" // provider ID needs to be specified when the cloud provider is external ksc.ProviderID = ptr.String(getProviderId(cfg.Status.Instance.AvailabilityZone, cfg.Status.Instance.ID)) // the name of the Node object must equal the EC2 PrivateDnsName // see: https://github.com/awslabs/amazon-eks-ami/pull/1264 flags["hostname-override"] = cfg.Status.Instance.PrivateDNSName } else { flags["cloud-provider"] = "aws" } } // withHybridCloudProvider sets the cloud-provider to "" and sets the appropriate provider-id for the node func (ksc *kubeletConfig) withHybridCloudProvider(cfg *api.NodeConfig, flags map[string]string) { flags["cloud-provider"] = "" // provider ID needs to be specified when the cloud provider is external or empty string ksc.ProviderID = ptr.String(getHybridProviderId(cfg)) // hostname is overridden to the node name provided in the spec flags["hostname-override"] = cfg.Status.Hybrid.NodeName } func (ksc *kubeletConfig) withHybridNodeLabels(cfg *api.NodeConfig, flags map[string]string) { var labels []string labels = append(labels, hybridNodeLabel) labels = append(labels, fmt.Sprintf("%s=%s", credentialProviderLabelKey, cfg.GetNodeType())) flags["node-labels"] = strings.Join(labels, ",") } // When the DefaultReservedResources flag is enabled, override the kubelet // config with reserved cgroup values on behalf of the user func (ksc *kubeletConfig) withDefaultReservedResources(cfg *api.NodeConfig) { ksc.SystemReservedCgroup = ptr.String("/system") ksc.KubeReservedCgroup = ptr.String("/runtime") maxPods, ok := MaxPodsPerInstanceType[cfg.Status.Instance.Type] if !ok { ksc.MaxPods = CalcMaxPods(cfg.Status.Instance.Region, cfg.Status.Instance.Type) } else { ksc.MaxPods = int32(maxPods) } ksc.KubeReserved = map[string]string{ "cpu": fmt.Sprintf("%dm", getCPUMillicoresToReserve()), "ephemeral-storage": "1Gi", "memory": fmt.Sprintf("%dMi", getMemoryMebibytesToReserve(ksc.MaxPods)), } } // withHybridReservedResources reserves cpu and memory according to below computation // for kubelet in order for safe cluster management operation func (ksc *kubeletConfig) withHybridReservedResources() error { ksc.SystemReservedCgroup = ptr.String("/system") ksc.KubeReservedCgroup = ptr.String("/runtime") // calculate kube reserved memory totalMemory, err := system.GetMachineMemoryCapacity() if err != nil { return err } // Convert bytes to GiB totalMemoryGiB := totalMemory / (1024 * 1024 * 1024) var reserveMemoryString string // For memory resources, nodeadm will reserve according to the following table for hybrid nodes // 255 MiB when total memory is < 1GiB // 25% of first 4GiB of total memory // 20% of next 4GiB of total memory // 10% of next 8 GiB of total memory // 6% of next 112 GiB of total memory // 2% of remaining total memory switch { case totalMemoryGiB < 1: reserveMemoryString = fmt.Sprintf("%dMi", 255) case totalMemoryGiB < 4: reserveMemoryString = fmt.Sprintf("%dGi", int(math.Round(float64(totalMemoryGiB)*0.25))) case totalMemoryGiB < 8: reserveMemoryString = fmt.Sprintf("%dGi", int(math.Round((0.25*4)+float64(totalMemoryGiB-4)*0.2))) case totalMemoryGiB < 16: reserveMemoryString = fmt.Sprintf("%dGi", int(math.Round((0.25*4)+(0.20*4)+float64(totalMemoryGiB-8)*0.1))) case totalMemoryGiB <= 128: reserveMemoryString = fmt.Sprintf("%dGi", int(math.Round((0.25*4)+(0.20*4)+(0.10*8)+float64(totalMemoryGiB-16)*0.06))) case totalMemoryGiB > 128: reserveMemoryString = fmt.Sprintf("%dGi", int(math.Round((0.25*4)+(0.20*4)+(0.10*8)+(0.06*112)+float64(totalMemoryGiB-128)*0.02))) } ksc.KubeReserved = map[string]string{ "cpu": fmt.Sprintf("%dm", getCPUMillicoresToReserve()), "ephemeral-storage": "1Gi", "memory": reserveMemoryString, } return nil } // withPodInfraContainerImage determines whether to add the // '--pod-infra-container-image' flag, which is used to ensure the sandbox image // is not garbage collected. // // TODO: revisit once the minimum supportted version catches up or the container // runtime is moved to containerd 2.0 func (ksc *kubeletConfig) withPodInfraContainerImage(cfg *api.NodeConfig, kubeletVersion string, flags map[string]string) error { // the flag is a noop on 1.29+, since the behavior was changed to use the // CRI image pinning behavior and no longer considers the flag value. // see: https://github.com/kubernetes/kubernetes/pull/118544 if semver.Compare(kubeletVersion, "v1.29.0") < 0 { flags["pod-infra-container-image"] = cfg.Status.Defaults.SandboxImage } return nil } func (k *kubelet) GenerateKubeletConfig() (*kubeletConfig, error) { // Get the kubelet/kubernetes version to help conditionally enable features kubeletVersion, err := GetKubeletVersion() if err != nil { return nil, err } zap.L().Info("Detected kubelet version", zap.String("version", kubeletVersion)) kubeletConfig := defaultKubeletSubConfig() if err := kubeletConfig.withFallbackClusterDns(&k.nodeConfig.Spec.Cluster); err != nil { return nil, err } if err := kubeletConfig.withOutpostSetup(k.nodeConfig); err != nil { return nil, err } if err := kubeletConfig.withPodInfraContainerImage(k.nodeConfig, kubeletVersion, k.flags); err != nil { return nil, err } kubeletConfig.withVersionToggles(kubeletVersion, k.flags) if k.nodeConfig.IsHybridNode() { kubeletConfig.withHybridCloudProvider(k.nodeConfig, k.flags) kubeletConfig.withHybridNodeLabels(k.nodeConfig, k.flags) if err := kubeletConfig.withHybridReservedResources(); err != nil { return nil, err } // On Ubuntu, systemd-resolved adds loopback address as nameserver to /etc/resolv.conf // This causes pods not being able to do successful dns lookups // Setting Kubelet config to point to the right resolv.conf file // https://coredns.io/plugins/loop/#troubleshooting-loops-in-kubernetes-clusters if system.GetOsName() == system.UbuntuOsName { kubeletConfig.withResolvConf(system.UbuntuResolvConfPath) } } else { if err := kubeletConfig.withNodeIp(k.nodeConfig, k.flags); err != nil { return nil, err } kubeletConfig.withCloudProvider(kubeletVersion, k.nodeConfig, k.flags) kubeletConfig.withDefaultReservedResources(k.nodeConfig) } return &kubeletConfig, nil } // WriteConfig writes the kubelet config to a file. // This should only be used for kubelet versions < 1.28. func (k *kubelet) writeKubeletConfigToFile() error { kubeletConfig, err := k.GenerateKubeletConfig() if err != nil { return err } var kubeletConfigBytes []byte if len(k.nodeConfig.Spec.Kubelet.Config) > 0 { mergedMap, err := util.DocumentMerge(kubeletConfig, k.nodeConfig.Spec.Kubelet.Config, mergo.WithOverride) if err != nil { return err } if kubeletConfigBytes, err = json.MarshalIndent(mergedMap, "", strings.Repeat(" ", 4)); err != nil { return err } } else { var err error if kubeletConfigBytes, err = json.MarshalIndent(kubeletConfig, "", strings.Repeat(" ", 4)); err != nil { return err } } configPath := path.Join(kubeletConfigRoot, kubeletConfigFile) k.flags["config"] = configPath zap.L().Info("Writing kubelet config to file..", zap.String("path", configPath)) return util.WriteFileWithDir(configPath, kubeletConfigBytes, kubeletConfigPerm) } // WriteKubeletConfigToDir writes nodeadm's generated kubelet config to the // standard config file and writes the user's provided config to a directory for // drop-in support. This is only supported on kubelet versions >= 1.28. see: // https://kubernetes.io/docs/tasks/administer-cluster/kubelet-config-file/#kubelet-conf-d func (k *kubelet) writeKubeletConfigToDir() error { kubeletConfig, err := k.GenerateKubeletConfig() if err != nil { return err } kubeletConfigBytes, err := json.MarshalIndent(kubeletConfig, "", strings.Repeat(" ", 4)) if err != nil { return err } configPath := path.Join(kubeletConfigRoot, kubeletConfigFile) k.flags["config"] = configPath zap.L().Info("Writing kubelet config to file..", zap.String("path", configPath)) if err := util.WriteFileWithDir(configPath, kubeletConfigBytes, kubeletConfigPerm); err != nil { return err } if len(k.nodeConfig.Spec.Kubelet.Config) > 0 { dirPath := path.Join(kubeletConfigRoot, kubeletConfigDir) k.flags["config-dir"] = dirPath zap.L().Info("Enabling kubelet config drop-in dir..") k.setEnv("KUBELET_CONFIG_DROPIN_DIR_ALPHA", "on") filePath := path.Join(dirPath, "00-nodeadm.conf") // merge in default type metadata like kind and apiVersion in case the // user has not specified this, as it is required to qualify a drop-in // config as a valid KubeletConfiguration userKubeletConfigMap, err := util.DocumentMerge(defaultKubeletSubConfig().TypeMeta, k.nodeConfig.Spec.Kubelet.Config) if err != nil { return err } zap.L().Info("Writing user kubelet config to drop-in file..", zap.String("path", filePath)) userKubeletConfigBytes, err := json.MarshalIndent(userKubeletConfigMap, "", strings.Repeat(" ", 4)) if err != nil { return err } if err := util.WriteFileWithDir(filePath, userKubeletConfigBytes, kubeletConfigPerm); err != nil { return err } } return nil } func getProviderId(availabilityZone, instanceId string) string { return fmt.Sprintf("aws:///%s/%s", availabilityZone, instanceId) } func getHybridProviderId(cfg *api.NodeConfig) string { return fmt.Sprintf("%s:///%s/%s/%s", hybridProviderIdPrefix, cfg.Spec.Cluster.Region, cfg.Spec.Cluster.Name, cfg.Status.Hybrid.NodeName) } // Get the IP of the node depending on the ipFamily configured for the cluster func getNodeIp(ctx context.Context, imdsClient *imds.Client, cfg *api.NodeConfig) (string, error) { ipFamily, err := api.GetCIDRIpFamily(cfg.Spec.Cluster.CIDR) if err != nil { return "", err } switch ipFamily { case api.IPFamilyIPv4: ipv4Response, err := imdsClient.GetMetadata(ctx, &imds.GetMetadataInput{ Path: "local-ipv4", }) if err != nil { return "", err } ip, err := io.ReadAll(ipv4Response.Content) if err != nil { return "", err } return string(ip), nil case api.IPFamilyIPv6: ipv6Response, err := imdsClient.GetMetadata(ctx, &imds.GetMetadataInput{ Path: fmt.Sprintf("network/interfaces/macs/%s/ipv6s", cfg.Status.Instance.MAC), }) if err != nil { return "", err } ip, err := io.ReadAll(ipv6Response.Content) if err != nil { return "", err } return string(ip), nil default: return "", fmt.Errorf("invalid ip-family. %s is not one of %v", ipFamily, []api.IPFamily{api.IPFamilyIPv4, api.IPFamilyIPv6}) } } func getCPUMillicoresToReserve() int { totalCPUMillicores, err := system.GetMilliNumCores() if err != nil { zap.L().Error("Error found when GetMilliNumCores", zap.Error(err)) return 0 } cpuRanges := []int{0, 1000, 2000, 4000, totalCPUMillicores} cpuPercentageReservedForRanges := []int{600, 100, 50, 25} cpuToReserve := 0 for i, percentageToReserveForRange := range cpuPercentageReservedForRanges { startRange := cpuRanges[i] endRange := cpuRanges[i+1] cpuToReserve += getResourceToReserveInRange(totalCPUMillicores, startRange, endRange, percentageToReserveForRange) } return cpuToReserve } // getResourceToReserveInRange calculates the CPU resources to reserve for a given range. func getResourceToReserveInRange(totalCPU, startRange, endRange, percentage int) int { if totalCPU <= startRange { return 0 } reserved := totalCPU if reserved > endRange { reserved = endRange } return (reserved - startRange) * percentage / 10000 } func getMemoryMebibytesToReserve(maxPods int32) int32 { return 11*maxPods + 255 } func getKubeletConfigFromDisk() (*kubeletConfig, error) { data, err := os.ReadFile(filepath.Join(kubeletConfigRoot, kubeletConfigFile)) if err != nil { return nil, err } var kubeletConf kubeletConfig if err = yaml.Unmarshal(data, &kubeletConf); err != nil { return nil, err } return &kubeletConf, nil } // GetNodeName gets the current node name from the providerId in kubelet config func GetNodeName() (string, error) { kubeletConf, err := getKubeletConfigFromDisk() if err != nil { return "", errors.Wrap(err, "failed to get kubelet configuration from disk") } matches := nodeNameProviderIdRegexPattern.FindStringSubmatch(*kubeletConf.ProviderID) // matches have entire string, 1st match, 2nd match, etc if len(matches) > 1 { return matches[1], nil } return "", errors.New("failed to get node name from provider id") }