pkg/conf/schedulerconf.go (394 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package conf import ( "bytes" "compress/gzip" "encoding/json" "errors" "flag" "fmt" "io" "os" "strconv" "strings" "sync" "sync/atomic" "time" "go.uber.org/zap" "go.uber.org/zap/zapcore" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/locking" "github.com/apache/yunikorn-k8shim/pkg/log" ) type SchedulerConfFactory = func() *SchedulerConf const ( // env vars EnvHome = "HOME" EnvKubeConfig = "KUBECONFIG" EnvNamespace = "NAMESPACE" // prefixes PrefixService = "service." PrefixLog = "log." PrefixKubernetes = "kubernetes." PrefixAdmissionController = "admissionController." // service CMSvcClusterID = PrefixService + "clusterId" CMSvcPolicyGroup = PrefixService + "policyGroup" CMSvcSchedulingInterval = PrefixService + "schedulingInterval" CMSvcVolumeBindTimeout = PrefixService + "volumeBindTimeout" CMSvcEventChannelCapacity = PrefixService + "eventChannelCapacity" CMSvcDispatchTimeout = PrefixService + "dispatchTimeout" CMSvcDisableGangScheduling = PrefixService + "disableGangScheduling" CMSvcEnableConfigHotRefresh = PrefixService + "enableConfigHotRefresh" CMSvcPlaceholderImage = PrefixService + "placeholderImage" CMSvcNodeInstanceTypeNodeLabelKey = PrefixService + "nodeInstanceTypeNodeLabelKey" // kubernetes CMKubeQPS = PrefixKubernetes + "qps" CMKubeBurst = PrefixKubernetes + "burst" // admissioncontroller PrefixAMFiltering = PrefixAdmissionController + "filtering." AMFilteringGenerateUniqueAppIds = PrefixAMFiltering + "generateUniqueAppId" // defaults DefaultNamespace = "default" DefaultClusterID = "mycluster" DefaultPolicyGroup = "queues" DefaultSchedulingInterval = time.Second DefaultVolumeBindTimeout = 10 * time.Minute DefaultEventChannelCapacity = 1024 * 1024 DefaultDispatchTimeout = 300 * time.Second DefaultOperatorPlugins = "general" DefaultDisableGangScheduling = false DefaultEnableConfigHotRefresh = true DefaultKubeQPS = 1000 DefaultKubeBurst = 1000 DefaultAMFilteringGenerateUniqueAppIds = false ) var ( buildVersion string buildDate string isPluginVersion string goVersion string arch string coreSHA string siSHA string shimSHA string ) var once sync.Once var confHolder atomic.Value var kubeLoggerOnce sync.Once type SchedulerConf struct { SchedulerName string `json:"schedulerName"` ClusterID string `json:"clusterId"` ClusterVersion string `json:"clusterVersion"` PolicyGroup string `json:"policyGroup"` Interval time.Duration `json:"schedulingIntervalSecond"` KubeConfig string `json:"absoluteKubeConfigFilePath"` VolumeBindTimeout time.Duration `json:"volumeBindTimeout"` EventChannelCapacity int `json:"eventChannelCapacity"` DispatchTimeout time.Duration `json:"dispatchTimeout"` KubeQPS int `json:"kubeQPS"` KubeBurst int `json:"kubeBurst"` EnableConfigHotRefresh bool `json:"enableConfigHotRefresh"` DisableGangScheduling bool `json:"disableGangScheduling"` UserLabelKey string `json:"userLabelKey"` PlaceHolderImage string `json:"placeHolderImage"` InstanceTypeNodeLabelKey string `json:"instanceTypeNodeLabelKey"` Namespace string `json:"namespace"` GenerateUniqueAppIds bool `json:"generateUniqueAppIds"` locking.RWMutex } func (conf *SchedulerConf) Clone() *SchedulerConf { conf.RLock() defer conf.RUnlock() return &SchedulerConf{ SchedulerName: conf.SchedulerName, ClusterID: conf.ClusterID, ClusterVersion: conf.ClusterVersion, PolicyGroup: conf.PolicyGroup, Interval: conf.Interval, KubeConfig: conf.KubeConfig, VolumeBindTimeout: conf.VolumeBindTimeout, EventChannelCapacity: conf.EventChannelCapacity, DispatchTimeout: conf.DispatchTimeout, KubeQPS: conf.KubeQPS, KubeBurst: conf.KubeBurst, EnableConfigHotRefresh: conf.EnableConfigHotRefresh, DisableGangScheduling: conf.DisableGangScheduling, UserLabelKey: conf.UserLabelKey, PlaceHolderImage: conf.PlaceHolderImage, InstanceTypeNodeLabelKey: conf.InstanceTypeNodeLabelKey, Namespace: conf.Namespace, GenerateUniqueAppIds: conf.GenerateUniqueAppIds, } } func UpdateConfigMaps(configMaps []*v1.ConfigMap, initial bool) error { log.Log(log.ShimConfig).Info("reloading configuration") // start with defaults prev := CreateDefaultConfig() // flatten configmap entries to single map config := FlattenConfigMaps(configMaps) // parse values from configmaps newConf, cmErrors := parseConfig(config, prev) if cmErrors != nil { for _, err := range cmErrors { log.Log(log.ShimConfig).Error("failed to parse configmap entry", zap.Error(err)) } return errors.New("failed to load configmap") } // check for settings which cannot be hot-reloaded if !initial { oldConf := GetSchedulerConf() handleNonReloadableConfig(oldConf, newConf) } // update scheduler config with merged version SetSchedulerConf(newConf) _ = GetSchedulerConf() // update logger configuration log.UpdateLoggingConfig(config) // update Kubernetes logger configuration updateKubeLogger() // dump new scheduler configuration DumpConfiguration() return nil } func handleNonReloadableConfig(old *SchedulerConf, new *SchedulerConf) { // warn about and revert any settings which cannot be hot-reloaded checkNonReloadableString(CMSvcClusterID, &old.ClusterID, &new.ClusterID) checkNonReloadableString(CMSvcPolicyGroup, &old.PolicyGroup, &new.PolicyGroup) checkNonReloadableDuration(CMSvcSchedulingInterval, &old.Interval, &new.Interval) checkNonReloadableDuration(CMSvcVolumeBindTimeout, &old.VolumeBindTimeout, &new.VolumeBindTimeout) checkNonReloadableInt(CMSvcEventChannelCapacity, &old.EventChannelCapacity, &new.EventChannelCapacity) checkNonReloadableDuration(CMSvcDispatchTimeout, &old.DispatchTimeout, &new.DispatchTimeout) checkNonReloadableInt(CMKubeQPS, &old.KubeQPS, &new.KubeQPS) checkNonReloadableInt(CMKubeBurst, &old.KubeBurst, &new.KubeBurst) checkNonReloadableBool(CMSvcDisableGangScheduling, &old.DisableGangScheduling, &new.DisableGangScheduling) checkNonReloadableString(CMSvcPlaceholderImage, &old.PlaceHolderImage, &new.PlaceHolderImage) checkNonReloadableString(CMSvcNodeInstanceTypeNodeLabelKey, &old.InstanceTypeNodeLabelKey, &new.InstanceTypeNodeLabelKey) checkNonReloadableBool(AMFilteringGenerateUniqueAppIds, &old.GenerateUniqueAppIds, &new.GenerateUniqueAppIds) } const warningNonReloadable = "ignoring non-reloadable configuration change (restart required to update)" func checkNonReloadableString(name string, old *string, new *string) { if *old != *new { log.Log(log.ShimConfig).Warn(warningNonReloadable, zap.String("config", name), zap.String("existing", *old), zap.String("new", *new)) *new = *old } } func checkNonReloadableDuration(name string, old *time.Duration, new *time.Duration) { if *old != *new { log.Log(log.ShimConfig).Warn(warningNonReloadable, zap.String("config", name), zap.Duration("existing", *old), zap.Duration("new", *new)) *new = *old } } func checkNonReloadableInt(name string, old *int, new *int) { if *old != *new { log.Log(log.ShimConfig).Warn(warningNonReloadable, zap.String("config", name), zap.Int("existing", *old), zap.Int("new", *new)) *new = *old } } func checkNonReloadableBool(name string, old *bool, new *bool) { if *old != *new { log.Log(log.ShimConfig).Warn(warningNonReloadable, zap.String("config", name), zap.Bool("existing", *old), zap.Bool("new", *new)) *new = *old } } func GetSchedulerConf() *SchedulerConf { once.Do(createConfigs) return confHolder.Load().(*SchedulerConf) //nolint:errcheck } func SetSchedulerConf(conf *SchedulerConf) { // this is just to ensure that the original is in place first once.Do(createConfigs) confHolder.Store(conf) } func (conf *SchedulerConf) IsConfigReloadable() bool { conf.RLock() defer conf.RUnlock() return conf.EnableConfigHotRefresh } func (conf *SchedulerConf) GetSchedulingInterval() time.Duration { conf.RLock() defer conf.RUnlock() return conf.Interval } func (conf *SchedulerConf) GetKubeConfigPath() string { conf.RLock() defer conf.RUnlock() return conf.KubeConfig } func GetSchedulerNamespace() string { if value, ok := os.LookupEnv(EnvNamespace); ok { return value } return DefaultNamespace } func createConfigs() { confHolder.Store(CreateDefaultConfig()) } func GetDefaultKubeConfigPath() string { conf, ok := os.LookupEnv(EnvKubeConfig) if ok { return conf } home, ok := os.LookupEnv(EnvHome) if !ok { home = "" } return fmt.Sprintf("%s/.kube/config", home) } // CreateDefaultConfig creates and returns a configuration representing all default values func CreateDefaultConfig() *SchedulerConf { return &SchedulerConf{ SchedulerName: constants.SchedulerName, Namespace: GetSchedulerNamespace(), ClusterID: DefaultClusterID, ClusterVersion: buildVersion, PolicyGroup: DefaultPolicyGroup, Interval: DefaultSchedulingInterval, KubeConfig: GetDefaultKubeConfigPath(), VolumeBindTimeout: DefaultVolumeBindTimeout, EventChannelCapacity: DefaultEventChannelCapacity, DispatchTimeout: DefaultDispatchTimeout, KubeQPS: DefaultKubeQPS, KubeBurst: DefaultKubeBurst, EnableConfigHotRefresh: DefaultEnableConfigHotRefresh, DisableGangScheduling: DefaultDisableGangScheduling, UserLabelKey: constants.DefaultUserLabel, PlaceHolderImage: constants.PlaceholderContainerImage, InstanceTypeNodeLabelKey: constants.DefaultNodeInstanceTypeNodeLabelKey, GenerateUniqueAppIds: DefaultAMFilteringGenerateUniqueAppIds, } } func parseConfig(config map[string]string, prev *SchedulerConf) (*SchedulerConf, []error) { conf := prev.Clone() if len(config) == 0 { // no changes return conf, nil } parser := newConfigParser(config) // service parser.stringVar(&conf.ClusterID, CMSvcClusterID) parser.stringVar(&conf.PolicyGroup, CMSvcPolicyGroup) parser.durationVar(&conf.Interval, CMSvcSchedulingInterval) parser.durationVar(&conf.VolumeBindTimeout, CMSvcVolumeBindTimeout) parser.intVar(&conf.EventChannelCapacity, CMSvcEventChannelCapacity) parser.durationVar(&conf.DispatchTimeout, CMSvcDispatchTimeout) parser.boolVar(&conf.DisableGangScheduling, CMSvcDisableGangScheduling) parser.boolVar(&conf.EnableConfigHotRefresh, CMSvcEnableConfigHotRefresh) parser.stringVar(&conf.PlaceHolderImage, CMSvcPlaceholderImage) parser.stringVar(&conf.InstanceTypeNodeLabelKey, CMSvcNodeInstanceTypeNodeLabelKey) // kubernetes parser.intVar(&conf.KubeQPS, CMKubeQPS) parser.intVar(&conf.KubeBurst, CMKubeBurst) // admission controller parser.boolVar(&conf.GenerateUniqueAppIds, AMFilteringGenerateUniqueAppIds) if len(parser.errors) > 0 { return nil, parser.errors } return conf, nil } type configParser struct { errors []error config map[string]string } func newConfigParser(config map[string]string) *configParser { return &configParser{ errors: make([]error, 0), config: config, } } func (cp *configParser) stringVar(p *string, name string) { if newValue, ok := cp.config[name]; ok { *p = newValue } } func (cp *configParser) intVar(p *int, name string) { if newValue, ok := cp.config[name]; ok { int64Value, err := strconv.ParseInt(newValue, 10, 32) intValue := int(int64Value) if err != nil { log.Log(log.ShimConfig).Error("Unable to parse configmap entry", zap.String("key", name), zap.String("value", newValue), zap.Error(err)) cp.errors = append(cp.errors, err) return } *p = intValue } } func (cp *configParser) boolVar(p *bool, name string) { if newValue, ok := cp.config[name]; ok { boolValue, err := strconv.ParseBool(newValue) if err != nil { log.Log(log.ShimConfig).Error("Unable to parse configmap entry", zap.String("key", name), zap.String("value", newValue), zap.Error(err)) cp.errors = append(cp.errors, err) return } *p = boolValue } } func (cp *configParser) durationVar(p *time.Duration, name string) { if newValue, ok := cp.config[name]; ok { durationValue, err := time.ParseDuration(newValue) if err != nil { log.Log(log.ShimConfig).Error("Unable to parse configmap entry", zap.String("key", name), zap.String("value", newValue), zap.Error(err)) cp.errors = append(cp.errors, err) return } *p = durationValue } } func updateKubeLogger() { // if log level is debug, enable klog and set its log level verbosity to 4 (represents debug level), // For details refer to the Logging Conventions of klog at // https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md // danger, this can only be called once! kubeLoggerOnce.Do(func() { if log.Log(log.Kubernetes).Core().Enabled(zapcore.DebugLevel) { klog.InitFlags(nil) // cannot really handle the error here ignore it //nolint:errcheck _ = flag.Set("v", "4") } }) } func DumpConfiguration() { configs := GetSchedulerConf() c, err := json.MarshalIndent(configs, "", " ") logger := log.Log(log.ShimConfig) //nolint:errcheck defer logger.Sync() if err != nil { logger.Info("scheduler configuration, json conversion failed", zap.Any("configs", configs)) } else { logger.Info("scheduler configuration, pretty print", zap.ByteString("configs", c)) } } func Decompress(key string, value []byte) (string, string) { var uncompressedData string splitKey := strings.Split(key, ".") compressionAlgo := splitKey[len(splitKey)-1] if strings.EqualFold(compressionAlgo, constants.GzipSuffix) { gzReader, err := gzip.NewReader(bytes.NewReader(value)) if err != nil { log.Log(log.ShimConfig).Error("failed to decompress decoded schedulerConfig entry", zap.Error(err)) return "", "" } defer func() { if err = gzReader.Close(); err != nil { log.Log(log.ShimConfig).Debug("gzip Reader could not be closed ", zap.Error(err)) } }() decompressedBytes, err := io.ReadAll(gzReader) if err != nil { log.Log(log.ShimConfig).Error("failed to decompress decoded schedulerConfig entry", zap.Error(err)) return "", "" } uncompressedData = string(decompressedBytes) } strippedKey, _ := strings.CutSuffix(key, "."+compressionAlgo) return strippedKey, uncompressedData } func FlattenConfigMaps(configMaps []*v1.ConfigMap) map[string]string { result := make(map[string]string) for _, configMap := range configMaps { if configMap != nil { for k, v := range configMap.Data { result[k] = v } for k, v := range configMap.BinaryData { strippedKey, uncompressedData := Decompress(k, v) result[strippedKey] = uncompressedData } } } return result } func GetBuildInfoMap() map[string]string { return map[string]string{ "buildVersion": buildVersion, "buildDate": buildDate, "isPluginVersion": isPluginVersion, "goVersion": goVersion, "arch": arch, "coreSHA": coreSHA, "siSHA": siSHA, "shimSHA": shimSHA, } } func GetBuildInfoString() string { return fmt.Sprintf( "Build info: version=%s date=%s isPluginVersion=%s goVersion=%s arch=%s coreSHA=%s siSHA=%s shimSHA=%s", buildVersion, buildDate, isPluginVersion, goVersion, arch, coreSHA, siSHA, shimSHA, ) }