pkg/admission/admission_controller.go (566 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"go.uber.org/zap"
admissionv1 "k8s.io/api/admission/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"github.com/apache/yunikorn-k8shim/pkg/admission/common"
"github.com/apache/yunikorn-k8shim/pkg/admission/conf"
"github.com/apache/yunikorn-k8shim/pkg/admission/metadata"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
schedulerconf "github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
const (
yunikornPod = "yunikorn"
admissionReviewAPIVersion = "admission.k8s.io/v1"
admissionReviewKind = "AdmissionReview"
schedulerValidateConfURLPattern = "http://%s/ws/v1/validate-conf"
mutateURL = "/mutate"
validateConfURL = "/validate-conf"
)
var (
runtimeScheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(runtimeScheme)
deserializer = codecs.UniversalDeserializer()
)
type AdmissionController struct {
conf *conf.AdmissionControllerConf
pcCache *PriorityClassCache
nsCache *NamespaceCache
annotationHandler *metadata.UserGroupAnnotationHandler
labelExtractor metadata.LabelExtractor
}
type ValidateConfResponse struct {
Allowed bool `json:"allowed"`
Reason string `json:"reason"`
}
func InitAdmissionController(conf *conf.AdmissionControllerConf, pcCache *PriorityClassCache, nsCache *NamespaceCache) *AdmissionController {
hook := &AdmissionController{
conf: conf,
pcCache: pcCache,
nsCache: nsCache,
annotationHandler: metadata.NewUserGroupAnnotationHandler(conf),
}
log.Log(log.Admission).Info("Initialized YuniKorn Admission Controller")
return hook
}
func parseRegexes(patterns string) ([]*regexp.Regexp, error) {
result := make([]*regexp.Regexp, 0)
for _, pattern := range strings.Split(patterns, ",") {
pattern = strings.TrimSpace(pattern)
if len(pattern) == 0 {
continue
}
re, err := regexp.Compile(pattern)
if err != nil {
log.Log(log.Admission).Error("Unable to compile regular expression", zap.String("pattern", pattern), zap.Error(err))
return nil, err
}
result = append(result, re)
}
return result, nil
}
func admissionResponseBuilder(uid string, allowed bool, resultMessage string, patch []byte) *admissionv1.AdmissionResponse {
res := &admissionv1.AdmissionResponse{}
res.Allowed = allowed
res.UID = types.UID(uid)
if len(resultMessage) != 0 {
res.Result = &metav1.Status{
Message: resultMessage,
}
}
if len(patch) != 0 {
res.Patch = patch
pt := admissionv1.PatchTypeJSONPatch
res.PatchType = &pt
}
return res
}
func (c *AdmissionController) mutate(req *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse {
if req == nil {
log.Log(log.Admission).Warn("empty request received")
return admissionResponseBuilder("", false, "", nil)
}
namespace := req.Namespace
if namespace == "" {
namespace = "default"
}
log.Log(log.Admission).Info("AdmissionReview",
zap.String("Namespace", namespace),
zap.String("UID", string(req.UID)),
zap.String("Operation", string(req.Operation)),
zap.String("Kind", req.Kind.Kind),
zap.Any("UserInfo", req.UserInfo))
if req.Operation == admissionv1.Update {
if req.Kind.Kind == metadata.Pod {
return c.processPodUpdate(req, namespace)
}
// resource types other than pods are ignored for UPDATE operations
return admissionResponseBuilder(string(req.UID), true, "", nil)
}
if req.Kind.Kind == metadata.Pod {
return c.processPod(req, namespace)
}
return c.processWorkload(req, namespace)
}
func (c *AdmissionController) processPod(req *admissionv1.AdmissionRequest, namespace string) *admissionv1.AdmissionResponse {
var patch []common.PatchOperation
var uid = string(req.UID)
var pod v1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
log.Log(log.Admission).Error("unmarshal failed", zap.Error(err))
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
userName := req.UserInfo.Username
groups := req.UserInfo.Groups
failureResponse, userInfoSet := c.checkUserInfoAnnotation(func() (string, bool) {
a, ok := pod.Annotations[common.UserInfoAnnotation]
return a, ok
}, userName, groups, uid)
if failureResponse != nil {
return failureResponse
}
if !userInfoSet && !c.conf.GetBypassAuth() {
log.Log(log.Admission).Info("setting user info metadata on pod")
patchOp, err := c.annotationHandler.GetPatchForPod(pod.Annotations, userName, groups)
if err != nil {
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
patch = append(patch, *patchOp)
}
if labelAppValue := utils.GetPodLabelValue(&pod, constants.LabelApp); labelAppValue != "" {
if labelAppValue == yunikornPod {
log.Log(log.Admission).Info("ignore yunikorn pod")
return admissionResponseBuilder(uid, true, "", nil)
}
}
if !c.shouldProcessNamespace(namespace) {
log.Log(log.Admission).Info("bypassing namespace", zap.String("namespace", namespace))
return admissionResponseBuilder(uid, true, "", nil)
}
patch = updateSchedulerName(patch)
if c.shouldLabelNamespace(namespace) {
patch = c.updateLabels(namespace, &pod, patch)
patch = c.updatePreemptionInfo(&pod, patch)
} else {
patch = disableYuniKorn(namespace, &pod, patch)
}
log.Log(log.Admission).Info("generated patch",
zap.String("namespace", namespace),
zap.String("podName", pod.Name),
zap.String("generateName", pod.GenerateName),
zap.Any("patch", patch))
patchBytes, err := json.Marshal(patch)
if err != nil {
log.Log(log.Admission).Error("failed to marshal patch", zap.Error(err))
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
return admissionResponseBuilder(uid, true, "", patchBytes)
}
func (c *AdmissionController) processWorkload(req *admissionv1.AdmissionRequest, namespace string) *admissionv1.AdmissionResponse {
var uid = string(req.UID)
if !c.shouldProcessWorkload(req) {
return admissionResponseBuilder(uid, true, "", nil)
}
var supported bool
var err error
var labels map[string]string
labels, supported, err = c.labelExtractor.GetLabelsFromWorkload(req)
if !supported {
// Unknown request kind - pass
return admissionResponseBuilder(uid, true, "", nil)
}
if err != nil {
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
if !c.shouldProcessAdmissionReview(namespace, labels) {
log.Log(log.Admission).Info("bypassing namespace", zap.String("namespace", namespace))
return admissionResponseBuilder(uid, true, "", nil)
}
var annotations map[string]string
annotations, supported, err = c.annotationHandler.GetAnnotationsFromRequestKind(req)
if !supported {
// Unknown request kind - pass
return admissionResponseBuilder(uid, true, "", nil)
}
if err != nil {
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
userName := req.UserInfo.Username
groups := req.UserInfo.Groups
failureResponse, userInfoSet := c.checkUserInfoAnnotation(func() (string, bool) {
a, ok := annotations[common.UserInfoAnnotation]
return a, ok
}, userName, groups, uid)
if failureResponse != nil {
return failureResponse
}
if !userInfoSet && !c.conf.GetBypassAuth() {
patch, err := c.annotationHandler.GetPatchForWorkload(req, userName, groups)
if err != nil {
log.Log(log.Admission).Error("could not generate patch for workload", zap.Error(err))
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
patchBytes, patchErr := json.Marshal(patch)
if patchErr != nil {
log.Log(log.Admission).Error("failed to marshal patch", zap.Error(patchErr))
return admissionResponseBuilder(uid, false, patchErr.Error(), nil)
}
log.Log(log.Admission).Info("updating annotations on workload", zap.String("type", req.Kind.Kind),
zap.Any("generated patch", patch))
return admissionResponseBuilder(uid, true, "", patchBytes)
}
return admissionResponseBuilder(uid, true, "", nil)
}
func (c *AdmissionController) processPodUpdate(req *admissionv1.AdmissionRequest, namespace string) *admissionv1.AdmissionResponse {
uid := string(req.UID)
var newPod v1.Pod
if err := json.Unmarshal(req.Object.Raw, &newPod); err != nil {
log.Log(log.Admission).Error("unmarshal failed", zap.Error(err))
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
var oldPod v1.Pod
if err := json.Unmarshal(req.OldObject.Raw, &oldPod); err != nil {
log.Log(log.Admission).Error("unmarshal failed", zap.Error(err))
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
if labelAppValue, ok := newPod.Labels[constants.LabelApp]; ok {
if labelAppValue == yunikornPod {
log.Log(log.Admission).Info("pod update - ignore yunikorn pod")
return admissionResponseBuilder(uid, true, "", nil)
}
}
if !c.shouldProcessAdmissionReview(namespace, newPod.Labels) {
log.Log(log.Admission).Info("pod update - bypassing namespace", zap.String("namespace", namespace))
return admissionResponseBuilder(uid, true, "", nil)
}
originalUserInfo := oldPod.Annotations[common.UserInfoAnnotation]
newUserInfo := newPod.Annotations[common.UserInfoAnnotation]
log.Log(log.Admission).Debug("checking original and new pod annotation", zap.String("original", originalUserInfo),
zap.String("new", newUserInfo))
if originalUserInfo != newUserInfo {
return admissionResponseBuilder(uid, false, "user info annotation change is not allowed", nil)
}
return admissionResponseBuilder(uid, true, "", nil)
}
func (c *AdmissionController) shouldProcessAdmissionReview(namespace string, labels map[string]string) bool {
if c.shouldProcessNamespace(namespace) &&
(labels[constants.LabelApplicationID] != "" || labels[constants.SparkLabelAppID] != "" || c.shouldLabelNamespace(namespace)) {
return true
}
return false
}
func (c *AdmissionController) shouldProcessWorkload(req *admissionv1.AdmissionRequest) bool {
// checking a special case: replicaset submitted by a K8s system user
// we must not touch the spec otherwise we'll end up having an infinite loop of replicaset creation
if req.Kind.Kind == metadata.ReplicaSet {
for _, sysUser := range c.conf.GetSystemUsers() {
if sysUser.MatchString(req.UserInfo.Username) {
log.Log(log.Admission).Info("ReplicaSet was created by a system user, skipping it",
zap.String("uid", string(req.UID)))
return false
}
}
}
return true
}
func (c *AdmissionController) checkUserInfoAnnotation(getAnnotation func() (string, bool), userName string, groups []string, uid string) (*admissionv1.AdmissionResponse, bool) {
annotation, userInfoSet := getAnnotation()
if userInfoSet && !c.conf.GetBypassAuth() {
if allowed := c.annotationHandler.IsAnnotationAllowed(userName, groups); !allowed {
errMsg := fmt.Sprintf("user %s with groups [%s] is not allowed to set user annotation", userName,
strings.Join(groups, ","))
log.Log(log.Admission).Error("user info validation failed - submitter is not allowed to set user annotation",
zap.String("user", userName),
zap.Strings("groups", groups))
return admissionResponseBuilder(uid, false, errMsg, nil), userInfoSet
}
if err := c.annotationHandler.IsAnnotationValid(annotation); err != nil {
log.Log(log.Admission).Error("invalid user info metadata", zap.Error(err))
return admissionResponseBuilder(uid, false, err.Error(), nil), userInfoSet
}
}
return nil, userInfoSet
}
func updateSchedulerName(patch []common.PatchOperation) []common.PatchOperation {
log.Log(log.Admission).Info("updating scheduler name")
return append(patch, common.PatchOperation{
Op: "add",
Path: "/spec/schedulerName",
Value: constants.SchedulerName,
})
}
func (c *AdmissionController) updatePreemptionInfo(pod *v1.Pod, patch []common.PatchOperation) []common.PatchOperation {
value := utils.GetPodAnnotationValue(pod, constants.AnnotationAllowPreemption)
// return if already set to a valid value
switch value {
case constants.True:
return patch
case constants.False:
return patch
}
value = constants.False
if c.pcCache.isPreemptSelfAllowed(pod.Spec.PriorityClassName) {
value = constants.True
}
log.Log(log.Admission).Info("updating pod preemption annotations",
zap.String("podName", pod.Name),
zap.String("allowPreemption", value))
// check for an existing patch on annotations and update it
for _, p := range patch {
if p.Op == "add" && p.Path == "/metadata/annotations" {
if annotations, ok := p.Value.(map[string]string); ok {
annotations[constants.AnnotationAllowPreemption] = value
return patch
}
}
}
result := updatePodAnnotation(pod, constants.AnnotationAllowPreemption, value)
patch = append(patch, common.PatchOperation{
Op: "add",
Path: "/metadata/annotations",
Value: result,
})
return patch
}
func (c *AdmissionController) updateLabels(namespace string, pod *v1.Pod, patch []common.PatchOperation) []common.PatchOperation {
log.Log(log.Admission).Info("updating pod labels",
zap.String("podName", pod.Name),
zap.String("generateName", pod.GenerateName),
zap.String("namespace", namespace),
zap.Any("labels", pod.Labels))
result := updatePodLabel(pod, namespace, c.conf.GetGenerateUniqueAppIds(), c.conf.GetDefaultQueueName())
patch = append(patch, common.PatchOperation{
Op: "add",
Path: "/metadata/labels",
Value: result,
})
return patch
}
func disableYuniKorn(namespace string, pod *v1.Pod, patch []common.PatchOperation) []common.PatchOperation {
log.Log(log.Admission).Info("disabling yunikorn on pod since namespace is set to no-label",
zap.String("podName", pod.Name),
zap.String("generateName", pod.GenerateName),
zap.String("namespace", namespace))
result := updatePodAnnotation(pod, constants.AnnotationIgnoreApplication, constants.True)
patch = append(patch, common.PatchOperation{
Op: "add",
Path: "/metadata/annotations",
Value: result,
})
return patch
}
func (c *AdmissionController) validateConf(req *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse {
if req == nil {
log.Log(log.Admission).Warn("empty request received")
return admissionResponseBuilder("", false, "", nil)
}
uid := string(req.UID)
var requestKind = req.Kind.Kind
if requestKind != "ConfigMap" {
log.Log(log.Admission).Warn("request kind is not configmap", zap.String("requestKind", requestKind))
return admissionResponseBuilder(uid, true, "", nil)
}
namespace := req.Namespace
if namespace == "" {
namespace = "default"
}
var configmap v1.ConfigMap
if err := json.Unmarshal(req.Object.Raw, &configmap); err != nil {
log.Log(log.Admission).Error("failed to unmarshal configmap", zap.Error(err))
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
// validate new/updated config map
if err := c.validateConfigMap(namespace, &configmap); err != nil {
log.Log(log.Admission).Error("failed to validate yunikorn configs", zap.Error(err))
return admissionResponseBuilder(uid, false, err.Error(), nil)
}
return admissionResponseBuilder(uid, true, "", nil)
}
func (c *AdmissionController) namespaceMatchesProcessList(namespace string) bool {
processNamespaces := c.conf.GetProcessNamespaces()
if len(processNamespaces) == 0 {
return true
}
for _, re := range processNamespaces {
if re.MatchString(namespace) {
return true
}
}
return false
}
func (c *AdmissionController) namespaceMatchesBypassList(namespace string) bool {
bypassNamespaces := c.conf.GetBypassNamespaces()
for _, re := range bypassNamespaces {
if re.MatchString(namespace) {
return true
}
}
return false
}
func (c *AdmissionController) namespaceMatchesLabelList(namespace string) bool {
labelNamespaces := c.conf.GetLabelNamespaces()
if len(labelNamespaces) == 0 {
return true
}
for _, re := range labelNamespaces {
if re.MatchString(namespace) {
return true
}
}
return false
}
func (c *AdmissionController) namespaceMatchesNoLabelList(namespace string) bool {
noLabelNamespaces := c.conf.GetNoLabelNamespaces()
for _, re := range noLabelNamespaces {
if re.MatchString(namespace) {
return true
}
}
return false
}
// shouldProcessNamespace returns true if the pod in the namespace must be redirected to the
// YuniKorn scheduler by setting the schedulerName
// First check is the namespace annotation (tri-state)
// - if present (FALSE, TRUE) return the value as boolean
// - if not present (UNSET) fallback to matching names based on regexp
func (c *AdmissionController) shouldProcessNamespace(namespace string) bool {
process := c.nsCache.enableYuniKorn(namespace)
if process != UNSET {
return process == TRUE
}
return c.namespaceMatchesProcessList(namespace) && !c.namespaceMatchesBypassList(namespace)
}
// shouldLabelNamespace returns true if the pod in the namespace must be labeled
// First check is the namespace annotation (tri-state)
// - if present (FALSE, TRUE) return the value as boolean
// - if not present (UNSET) fallback to matching names based on regexp
func (c *AdmissionController) shouldLabelNamespace(namespace string) bool {
label := c.nsCache.generateAppID(namespace)
if label != UNSET {
return label == TRUE
}
return c.namespaceMatchesLabelList(namespace) && !c.namespaceMatchesNoLabelList(namespace)
}
func (c *AdmissionController) validateConfigMap(namespace string, cm *v1.ConfigMap) error {
if namespace != c.conf.GetNamespace() {
log.Log(log.Admission).Debug("Configmap does not belong to YuniKorn", zap.String("namespace", namespace), zap.String("Name", cm.Name))
return nil
}
configMaps := c.conf.GetConfigMaps()
switch cm.Name {
case constants.DefaultConfigMapName:
configMaps[0] = cm
case constants.ConfigMapName:
configMaps[1] = cm
default:
log.Log(log.Admission).Debug("Configmap does not belong to YuniKorn", zap.String("namespace", namespace), zap.String("Name", cm.Name))
return nil
}
configs := schedulerconf.FlattenConfigMaps(configMaps)
policyGroup := conf.GetPendingPolicyGroup(configs)
confKey := fmt.Sprintf("%s.yaml", policyGroup)
content, ok := configs[confKey]
if !ok {
log.Log(log.Admission).Info("Configmap missing policygroup config, using default", zap.String("entry", confKey))
content = ""
}
checksum := fmt.Sprintf("%X", sha256.Sum256([]byte(content)))
log.Log(log.Admission).Info("Validating YuniKorn configuration", zap.String("checksum", checksum))
log.Log(log.Admission).Debug("Configmap data", zap.ByteString("content", []byte(content)))
response, err := http.Post(fmt.Sprintf(schedulerValidateConfURLPattern, c.conf.GetSchedulerServiceAddress()), "application/json", bytes.NewBuffer([]byte(content)))
if err != nil {
log.Log(log.Admission).Error("YuniKorn scheduler is unreachable, assuming configmap is valid", zap.Error(err))
return nil
}
defer response.Body.Close()
if response.StatusCode < 200 || response.StatusCode > 299 {
log.Log(log.Admission).Error("YuniKorn scheduler responded with unexpected status, assuming configmap is valid",
zap.Int("status", response.StatusCode))
return nil
}
responseBytes, err := io.ReadAll(response.Body)
if err != nil {
log.Log(log.Admission).Error("Unable to read response from YuniKorn scheduler, assuming configmap is valid", zap.Error(err))
return nil
}
var responseData ValidateConfResponse
if err = json.Unmarshal(responseBytes, &responseData); err != nil {
log.Log(log.Admission).Error("Unable to parse response from YuniKorn scheduler, assuming configmap is valid", zap.Error(err))
return nil
}
if !responseData.Allowed {
err = fmt.Errorf(responseData.Reason)
log.Log(log.Admission).Error("Configmap validation failed, aborting", zap.Error(err))
return err
}
log.Log(log.Admission).Info("Successfully validated YuniKorn configuration")
return nil
}
func (c *AdmissionController) Health(w http.ResponseWriter, r *http.Request) {
// for now, always healthy
w.Header().Set("Content-type", "text/plain")
w.WriteHeader(200)
_, err := w.Write([]byte("OK\r\n"))
if err != nil {
log.Log(log.Admission).Error("Unable to write health check result", zap.Error(err))
return
}
}
func (c *AdmissionController) Serve(w http.ResponseWriter, r *http.Request) {
log.Log(log.Admission).Debug("request", zap.Any("httpRequest", r))
var body []byte
if r.Body != nil {
var err error
body, err = io.ReadAll(r.Body)
if err != nil || len(body) == 0 {
log.Log(log.Admission).Debug("illegal request received: body invalid", zap.Error(err))
http.Error(w, "empty or invalid body", http.StatusBadRequest)
return
}
}
// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
log.Log(log.Admission).Debug("illegal request received: invalid content type", zap.String("requested content type", contentType))
http.Error(w, "invalid Content-Type, expect `application/json`", http.StatusUnsupportedMediaType)
return
}
urlPath := r.URL.Path
if urlPath != mutateURL && urlPath != validateConfURL {
log.Log(log.Admission).Debug("unsupported request received", zap.String("urlPath", urlPath))
http.Error(w, "request is neither mutation nor validation", http.StatusNotFound)
return
}
ar := admissionv1.AdmissionReview{TypeMeta: metav1.TypeMeta{
APIVersion: admissionReviewAPIVersion,
Kind: admissionReviewKind,
}}
var admissionResponse *admissionv1.AdmissionResponse
_, _, err := deserializer.Decode(body, nil, &ar)
if err != nil || ar.Request == nil {
log.Log(log.Admission).Error("request body decode failed or request empty", zap.Error(err))
admissionResponse = admissionResponseBuilder("yunikorn-invalid-body", false, "body decode failed", nil)
} else {
req := ar.Request
switch urlPath {
case mutateURL:
admissionResponse = c.mutate(req)
case validateConfURL:
admissionResponse = c.validateConf(req)
}
}
admissionReview := admissionv1.AdmissionReview{
TypeMeta: metav1.TypeMeta{
APIVersion: admissionReviewAPIVersion,
Kind: admissionReviewKind,
},
Response: admissionResponse,
}
var resp []byte
resp, err = json.Marshal(admissionReview)
if err != nil {
errMessage := fmt.Sprintf("could not encode response: %v", err)
log.Log(log.Admission).Error(errMessage)
http.Error(w, errMessage, http.StatusInternalServerError)
}
if _, err = w.Write(resp); err != nil {
errMessage := fmt.Sprintf("could not write response: %v", err)
log.Log(log.Admission).Error(errMessage)
http.Error(w, errMessage, http.StatusInternalServerError)
}
}