pkg/admission/conf/am_conf.go (321 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package conf import ( "errors" "fmt" "regexp" "strconv" "strings" "go.uber.org/zap" v1 "k8s.io/api/core/v1" informersv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/utils" schedulerconf "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/locking" "github.com/apache/yunikorn-k8shim/pkg/log" ) const ( AdmissionControllerPrefix = "admissionController." WebHookPrefix = AdmissionControllerPrefix + "webHook." FilteringPrefix = AdmissionControllerPrefix + "filtering." AccessControlPrefix = AdmissionControllerPrefix + "accessControl." // webhook configuration AMWebHookAMServiceName = WebHookPrefix + "amServiceName" AMWebHookSchedulerServiceAddress = WebHookPrefix + "schedulerServiceAddress" // filtering configuration AMFilteringProcessNamespaces = FilteringPrefix + "processNamespaces" AMFilteringBypassNamespaces = FilteringPrefix + "bypassNamespaces" AMFilteringLabelNamespaces = FilteringPrefix + "labelNamespaces" AMFilteringNoLabelNamespaces = FilteringPrefix + "noLabelNamespaces" AMFilteringGenerateUniqueAppIds = FilteringPrefix + "generateUniqueAppId" // access control configuration AMAccessControlBypassAuth = AccessControlPrefix + "bypassAuth" AMAccessControlTrustControllers = AccessControlPrefix + "trustControllers" AMAccessControlSystemUsers = AccessControlPrefix + "systemUsers" AMAccessControlExternalUsers = AccessControlPrefix + "externalUsers" AMAccessControlExternalGroups = AccessControlPrefix + "externalGroups" ) const ( // webhook defaults DefaultWebHookAmServiceName = "yunikorn-admission-controller-service" DefaultWebHookSchedulerServiceAddress = "yunikorn-service:9080" // filtering defaults DefaultFilteringProcessNamespaces = "" DefaultFilteringBypassNamespaces = "^kube-system$" DefaultFilteringLabelNamespaces = "" DefaultFilteringNoLabelNamespaces = "" DefaultFilteringGenerateUniqueAppIds = false // access control defaults DefaultAccessControlBypassAuth = false DefaultAccessControlTrustControllers = true DefaultAccessControlSystemUsers = "^system:serviceaccount:kube-system:" DefaultAccessControlExternalUsers = "" DefaultAccessControlExternalGroups = "" ) type AdmissionControllerConf struct { namespace string kubeConfig string // mutable values require locking enableConfigHotRefresh bool policyGroup string amServiceName string schedulerServiceAddress string processNamespaces []*regexp.Regexp bypassNamespaces []*regexp.Regexp labelNamespaces []*regexp.Regexp noLabelNamespaces []*regexp.Regexp generateUniqueAppIds bool bypassAuth bool trustControllers bool systemUsers []*regexp.Regexp externalUsers []*regexp.Regexp externalGroups []*regexp.Regexp configMaps []*v1.ConfigMap lock locking.RWMutex } func NewAdmissionControllerConf(configMaps []*v1.ConfigMap) *AdmissionControllerConf { acc := &AdmissionControllerConf{ namespace: schedulerconf.GetSchedulerNamespace(), kubeConfig: schedulerconf.GetDefaultKubeConfigPath(), } acc.updateConfigMaps(configMaps, true) return acc } func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) error { _, err := configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc}) if err != nil { return errors.Join(errors.New("failed to create register handlers: "), err) } return nil } func (acc *AdmissionControllerConf) GetNamespace() string { return acc.namespace } func (acc *AdmissionControllerConf) GetKubeConfig() string { return acc.kubeConfig } func (acc *AdmissionControllerConf) GetEnableConfigHotRefresh() bool { acc.lock.RLock() defer acc.lock.RUnlock() return acc.enableConfigHotRefresh } func (acc *AdmissionControllerConf) GetPolicyGroup() string { acc.lock.RLock() defer acc.lock.RUnlock() return acc.policyGroup } func GetPendingPolicyGroup(configs map[string]string) string { return parseConfigString(configs, schedulerconf.CMSvcPolicyGroup, schedulerconf.DefaultPolicyGroup) } func (acc *AdmissionControllerConf) GetAmServiceName() string { acc.lock.RLock() defer acc.lock.RUnlock() return acc.amServiceName } func (acc *AdmissionControllerConf) GetSchedulerServiceAddress() string { acc.lock.RLock() defer acc.lock.RUnlock() return acc.schedulerServiceAddress } func (acc *AdmissionControllerConf) GetProcessNamespaces() []*regexp.Regexp { acc.lock.RLock() defer acc.lock.RUnlock() return acc.processNamespaces } func (acc *AdmissionControllerConf) GetBypassNamespaces() []*regexp.Regexp { acc.lock.RLock() defer acc.lock.RUnlock() return acc.bypassNamespaces } func (acc *AdmissionControllerConf) GetLabelNamespaces() []*regexp.Regexp { acc.lock.RLock() defer acc.lock.RUnlock() return acc.labelNamespaces } func (acc *AdmissionControllerConf) GetNoLabelNamespaces() []*regexp.Regexp { acc.lock.RLock() defer acc.lock.RUnlock() return acc.noLabelNamespaces } func (acc *AdmissionControllerConf) GetGenerateUniqueAppIds() bool { acc.lock.RLock() defer acc.lock.RUnlock() return acc.generateUniqueAppIds } func (acc *AdmissionControllerConf) GetBypassAuth() bool { acc.lock.RLock() defer acc.lock.RUnlock() return acc.bypassAuth } func (acc *AdmissionControllerConf) GetTrustControllers() bool { acc.lock.RLock() defer acc.lock.RUnlock() return acc.trustControllers } func (acc *AdmissionControllerConf) GetSystemUsers() []*regexp.Regexp { acc.lock.RLock() defer acc.lock.RUnlock() return acc.systemUsers } func (acc *AdmissionControllerConf) GetExternalUsers() []*regexp.Regexp { acc.lock.RLock() defer acc.lock.RUnlock() return acc.externalUsers } func (acc *AdmissionControllerConf) GetExternalGroups() []*regexp.Regexp { acc.lock.RLock() defer acc.lock.RUnlock() return acc.externalGroups } type configMapUpdateHandler struct { conf *AdmissionControllerConf } func (h *configMapUpdateHandler) OnAdd(obj interface{}, _ bool) { cm := utils.Convert2ConfigMap(obj) if idx, ok := h.configMapIndex(cm); ok { h.conf.configUpdated(idx, cm) } } func (h *configMapUpdateHandler) OnUpdate(_, newObj interface{}) { cm := utils.Convert2ConfigMap(newObj) if idx, ok := h.configMapIndex(cm); ok { h.conf.configUpdated(idx, cm) } } func (h *configMapUpdateHandler) OnDelete(obj interface{}) { var cm *v1.ConfigMap switch t := obj.(type) { case *v1.ConfigMap: cm = t case cache.DeletedFinalStateUnknown: cm = utils.Convert2ConfigMap(obj) default: log.Log(log.AdmissionConf).Warn("unable to convert to configmap") return } if idx, ok := h.configMapIndex(cm); ok { h.conf.configUpdated(idx, nil) } } func (h *configMapUpdateHandler) configMapIndex(configMap *v1.ConfigMap) (int, bool) { if configMap == nil { return -1, false } if configMap.Namespace != h.conf.namespace { return -1, false } switch configMap.Name { case constants.DefaultConfigMapName: return 0, true case constants.ConfigMapName: return 1, true default: return -1, false } } func (acc *AdmissionControllerConf) configUpdated(index int, configMap *v1.ConfigMap) { configMaps := acc.GetConfigMaps() configMaps[index] = configMap acc.updateConfigMaps(configMaps, false) } func (acc *AdmissionControllerConf) GetConfigMaps() []*v1.ConfigMap { acc.lock.RLock() defer acc.lock.RUnlock() result := make([]*v1.ConfigMap, 0) result = append(result, acc.configMaps...) return result } func (acc *AdmissionControllerConf) updateConfigMaps(configMaps []*v1.ConfigMap, initial bool) { acc.lock.Lock() defer acc.lock.Unlock() // check for enable config hot refresh if !initial && !acc.enableConfigHotRefresh { log.Log(log.AdmissionConf).Warn("Config hot-refresh is disabled, ignoring configuration update") return } acc.configMaps = configMaps configs := schedulerconf.FlattenConfigMaps(configMaps) // hot refresh acc.enableConfigHotRefresh = parseConfigBool(configs, schedulerconf.CMSvcEnableConfigHotRefresh, schedulerconf.DefaultEnableConfigHotRefresh) // scheduler acc.policyGroup = parseConfigString(configs, schedulerconf.CMSvcPolicyGroup, schedulerconf.DefaultPolicyGroup) // webhook acc.amServiceName = parseConfigString(configs, AMWebHookAMServiceName, DefaultWebHookAmServiceName) acc.schedulerServiceAddress = parseConfigString(configs, AMWebHookSchedulerServiceAddress, DefaultWebHookSchedulerServiceAddress) // filtering acc.processNamespaces = parseConfigRegexps(configs, AMFilteringProcessNamespaces, DefaultFilteringProcessNamespaces) acc.bypassNamespaces = parseConfigRegexps(configs, AMFilteringBypassNamespaces, DefaultFilteringBypassNamespaces) acc.labelNamespaces = parseConfigRegexps(configs, AMFilteringLabelNamespaces, DefaultFilteringLabelNamespaces) acc.noLabelNamespaces = parseConfigRegexps(configs, AMFilteringNoLabelNamespaces, DefaultFilteringNoLabelNamespaces) acc.generateUniqueAppIds = parseConfigBool(configs, AMFilteringGenerateUniqueAppIds, DefaultFilteringGenerateUniqueAppIds) // access control acc.bypassAuth = parseConfigBool(configs, AMAccessControlBypassAuth, DefaultAccessControlBypassAuth) acc.trustControllers = parseConfigBool(configs, AMAccessControlTrustControllers, DefaultAccessControlTrustControllers) acc.systemUsers = parseConfigRegexps(configs, AMAccessControlSystemUsers, DefaultAccessControlSystemUsers) acc.externalUsers = parseConfigRegexps(configs, AMAccessControlExternalUsers, DefaultAccessControlExternalUsers) acc.externalGroups = parseConfigRegexps(configs, AMAccessControlExternalGroups, DefaultAccessControlExternalGroups) // logging log.UpdateLoggingConfig(configs) acc.dumpConfigurationInternal() } func (acc *AdmissionControllerConf) DumpConfiguration() { acc.lock.RLock() defer acc.lock.RUnlock() acc.dumpConfigurationInternal() } func (acc *AdmissionControllerConf) dumpConfigurationInternal() { log.Log(log.AdmissionConf).Info("Loaded admission controller configuration", zap.String("namespace", acc.namespace), zap.String("kubeConfig", acc.kubeConfig), zap.String("policyGroup", acc.policyGroup), zap.String("amServiceName", acc.amServiceName), zap.String("schedulerServiceAddress", acc.schedulerServiceAddress), zap.Strings("processNamespaces", regexpsString(acc.processNamespaces)), zap.Strings("bypassNamespaces", regexpsString(acc.bypassNamespaces)), zap.Strings("labelNamespaces", regexpsString(acc.labelNamespaces)), zap.Strings("noLabelNamespaces", regexpsString(acc.noLabelNamespaces)), zap.Bool("bypassAuth", acc.bypassAuth), zap.Bool("trustControllers", acc.trustControllers), zap.Strings("systemUsers", regexpsString(acc.systemUsers)), zap.Strings("externalUsers", regexpsString(acc.externalUsers)), zap.Strings("externalGroups", regexpsString(acc.externalGroups))) } func regexpsString(regexes []*regexp.Regexp) []string { result := make([]string, 0) for _, regex := range regexes { result = append(result, regex.String()) } return result } func parseConfigRegexps(config map[string]string, key string, defaultValue string) []*regexp.Regexp { value := parseConfigString(config, key, defaultValue) result, err := parseRegexes(value) if err != nil { log.Log(log.AdmissionConf).Error("Unable to parse regex values, using default", zap.String("key", key), zap.String("value", value), zap.String("default", defaultValue), zap.Error(err)) result, err = parseRegexes(defaultValue) if err != nil { log.Log(log.AdmissionConf).Fatal("BUG: can't parse default regex pattern", zap.Error(err)) } } return result } func parseConfigBool(config map[string]string, key string, defaultValue bool) bool { value := parseConfigString(config, key, fmt.Sprintf("%t", defaultValue)) result, err := strconv.ParseBool(value) if err != nil { log.Log(log.AdmissionConf).Error("Unable to parse bool value, using default", zap.String("key", key), zap.String("value", value), zap.Bool("default", defaultValue), zap.Error(err)) result = defaultValue } return result } func parseConfigString(config map[string]string, key string, defaultValue string) string { if value, ok := config[key]; ok { return value } return defaultValue } func parseRegexes(patterns string) ([]*regexp.Regexp, error) { result := make([]*regexp.Regexp, 0) for _, pattern := range strings.Split(patterns, ",") { pattern = strings.TrimSpace(pattern) if len(pattern) == 0 { continue } re, err := regexp.Compile(pattern) if err != nil { log.Log(log.AdmissionConf).Error("Unable to compile regular expression", zap.String("pattern", pattern), zap.Error(err)) return nil, err } result = append(result, re) } return result, nil }