pkg/sysched/sysched.go (306 lines of code) (raw):
package sysched
import (
"context"
"fmt"
"math"
"path"
"strings"
"github.com/containers/common/pkg/seccomp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
clientscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"sigs.k8s.io/security-profiles-operator/api/seccompprofile/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
)
type SySched struct {
handle framework.Handle
client client.Client
// Maintain state of what pods on each node
// Cached state from SharedLister does not hold system wide info of pods
// scheduled by other schedulers
HostToPods map[string][]*v1.Pod
// Key: node name
// Value: set of system call names
HostSyscalls map[string]sets.Set[string]
ExSAvg float64
ExSAvgCount int64
DefaultProfileNamespace string
DefaultProfileName string
WeightedSyscallProfile string
}
var _ framework.ScorePlugin = &SySched{}
// Name is the name of the plugin used in Registry and configurations.
const Name = "SySched"
// SPO annotation string
const SPO_ANNOTATION = "seccomp.security.alpha.kubernetes.io"
func remove(s []*v1.Pod, i int) []*v1.Pod {
if len(s) == 0 {
return nil
}
s[i] = s[len(s)-1]
return s[:len(s)-1]
}
// extracts filename and namespace from the relative seccomp
// profile path with the following formats
// e.g., localhost/operator/<namespace>/<filename>.json OR
// e.g., operator/<namespace>/<filename>.json
// func getCRDandNamespace(localhostProfile string) (string, string) {
func parseNameNS(profilePath string) (string, string) {
if profilePath == "" {
return "", ""
}
parts := strings.Split(profilePath, "/")
if len(parts) < 2 {
return "", ""
}
ns := parts[len(parts)-2]
// get filename without extension
name := strings.TrimSuffix(parts[len(parts)-1], path.Ext(parts[len(parts)-1]))
return ns, name
}
// fetch the system call list from a SPO seccomp profile CR in a given namespace
func (sc *SySched) readSPOProfileCR(name string, namespace string) (sets.Set[string], error) {
syscalls := sets.New[string]()
if name == "" || namespace == "" {
return syscalls, nil
}
// extract a seccomp SPO profile CR using namespace and cr name
seccompProfile := &v1beta1.SeccompProfile{}
err := sc.client.Get(context.TODO(), client.ObjectKey{
Namespace: namespace,
Name: name,
}, seccompProfile)
if err != nil {
return syscalls, err
}
syscallCategories := seccompProfile.Spec.Syscalls
// need to merge the syscalls in the syscall categories
// from multiple relevant actions, e.g., allow, log, notify
for _, element := range syscallCategories {
// NOTE: should we consider the other categories, e.g., notify, trace?
// SCMP_ACT_TRACE --> ActTrace, seccomp.ActNotify
if element.Action == seccomp.ActAllow || element.Action == seccomp.ActLog {
syscalls = syscalls.Union(sets.New[string](element.Names...))
}
}
return syscalls, nil
}
// obtains the system call list for a pod from the pod's seccomp profile
// SPO is used to generate and input the seccomp profile to a pod
// If a pod does not have a SPO seccomp profile, then an unconfined
// system call set is return for the pod
func (sc *SySched) getSyscalls(pod *v1.Pod) sets.Set[string] {
r := sets.New[string]()
// read the seccomp profile from the security context of a pod
podSC := pod.Spec.SecurityContext
if podSC != nil && podSC.SeccompProfile != nil && podSC.SeccompProfile.Type == "Localhost" {
if podSC.SeccompProfile.LocalhostProfile != nil {
profilePath := *podSC.SeccompProfile.LocalhostProfile
ns, name := parseNameNS(profilePath)
if len(ns) > 0 && len(name) > 0 {
syscalls, err := sc.readSPOProfileCR(name, ns)
if err != nil {
klog.ErrorS(err, "Failed to read syscall CR by parsing pod security context")
}
if len(syscalls) > 0 {
r = r.Union(syscalls)
}
}
}
}
// read the seccomp profile from container security context and merge them
for _, container := range pod.Spec.Containers {
conSC := container.SecurityContext
if conSC != nil && conSC.SeccompProfile != nil && conSC.SeccompProfile.Type == "Localhost" {
if conSC.SeccompProfile.LocalhostProfile != nil {
profilePath := *conSC.SeccompProfile.LocalhostProfile
ns, name := parseNameNS(profilePath)
if len(ns) > 0 && len(name) > 0 {
syscalls, err := sc.readSPOProfileCR(name, ns)
if err != nil {
klog.ErrorS(err, "Failed to read syscall CR by parsing container security context")
}
if len(syscalls) > 0 {
r = r.Union(syscalls)
}
}
}
}
}
// SPO seccomp profiles are sometimes automatically annotated to a pod
if pod.ObjectMeta.Annotations != nil {
// there could be multiple SPO seccomp profile annotations for a pod
// merge all profiles to obtain the syscal set for a pod
for k, v := range pod.ObjectMeta.Annotations {
// looks for annotation related to the seccomp
if strings.Contains(k, SPO_ANNOTATION) {
ns, name := parseNameNS(v)
if len(ns) > 0 && len(name) > 0 {
syscalls, err := sc.readSPOProfileCR(name, ns)
if err != nil {
klog.ErrorS(err, "Failed to read syscall CR by parsing pod annotation")
continue
}
if len(syscalls) > 0 {
r = r.Union(syscalls)
}
}
break
}
}
}
// if a pod does not have a seccomp profile specified, return the set of all syscalls
if len(r) == 0 {
syscalls, err := sc.readSPOProfileCR(sc.DefaultProfileName, sc.DefaultProfileNamespace)
if err != nil {
klog.ErrorS(err, "Failed to read the CR of all syscalls")
}
if syscalls.Len() > 0 {
r = r.Union(syscalls)
}
}
return r
}
// Name returns name of the plugin. It is used in logs, etc.
func (sc *SySched) Name() string {
return Name
}
func (sc *SySched) calcScore(syscalls sets.Set[string]) int {
// Currently, score is not adjusted based on critical/cve syscalls.
// NOTE: weight W is hardcoded for now
// TODO: add critical/cve syscalls
// TODO: adjust weight W for critical/cve syscalls
totCrit := 0
W := 1
score := syscalls.Len() - totCrit
score = score + W*totCrit
klog.V(10).InfoS("Score: ", "score", score, "tot_crit", totCrit)
return score
}
// Score invoked at the score extension point.
func (sc *SySched) Score(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
// Read directly from API server because cached state in SnapSharedLister not always up-to-date
// especially during initial scheduler start.
node, err := sc.handle.ClientSet().CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return 0, nil
}
podSyscalls := sc.getSyscalls(pod)
// NOTE: this condition is true only when a pod does not
// have a syscall profile, or the unconfined syscall is
// not set. We return a large number (INT64_MAX) as score.
if len(podSyscalls) == 0 {
return math.MaxInt64, nil
}
_, hostSyscalls := sc.getHostSyscalls(node.Name)
// when a host or node does not have any pods
// running, the extraneous syscall score is zero
if hostSyscalls == nil {
return 0, nil
}
diffSyscalls := hostSyscalls.Difference(podSyscalls)
totalDiffs := sc.calcScore(diffSyscalls)
// add the difference existing pods will see if new Pod is added into this host
newHostSyscalls := hostSyscalls.Clone()
newHostSyscalls = newHostSyscalls.Union(podSyscalls)
for _, p := range sc.HostToPods[node.Name] {
podSyscalls = sc.getSyscalls(p)
diffSyscalls = newHostSyscalls.Difference(podSyscalls)
totalDiffs += sc.calcScore(diffSyscalls)
}
sc.ExSAvg = sc.ExSAvg + (float64(totalDiffs)-sc.ExSAvg)/float64(sc.ExSAvgCount)
sc.ExSAvgCount += 1
klog.V(10).Info("ExSAvg: ", sc.ExSAvg)
klog.V(10).InfoS("Score: ", "totalDiffs", totalDiffs, "pod", pod.Name, "node", nodeName)
return int64(totalDiffs), nil
}
func (sc *SySched) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
klog.V(10).InfoS("Original: ", "scores", scores, "pod", pod.Name)
ret := helper.DefaultNormalizeScore(framework.MaxNodeScore, true, scores)
klog.V(10).InfoS("Normalized: ", "scores", scores, "pod", pod.Name)
return ret
}
// ScoreExtensions of the Score plugin.
func (sc *SySched) ScoreExtensions() framework.ScoreExtensions {
return sc
}
func (sc *SySched) getHostSyscalls(nodeName string) (int, sets.Set[string]) {
count := 0
h, ok := sc.HostSyscalls[nodeName]
if !ok {
klog.V(5).Infof("getHostSyscalls: no nodeName %s", nodeName)
return count, nil
}
return h.Len(), h
}
func (sc *SySched) updateHostSyscalls(pod *v1.Pod) {
syscall := sc.getSyscalls(pod)
sc.HostSyscalls[pod.Spec.NodeName] = sc.HostSyscalls[pod.Spec.NodeName].Union(syscall)
}
func (sc *SySched) addPod(pod *v1.Pod) {
nodeName := pod.Spec.NodeName
name := pod.Name
_, ok := sc.HostToPods[nodeName]
if !ok {
sc.HostToPods[nodeName] = make([]*v1.Pod, 0)
sc.HostToPods[nodeName] = append(sc.HostToPods[nodeName], pod)
sc.HostSyscalls[nodeName] = sets.New[string]()
sc.updateHostSyscalls(pod)
return
}
for _, p := range sc.HostToPods[nodeName] {
if p.Name == name {
return
}
}
sc.HostToPods[nodeName] = append(sc.HostToPods[nodeName], pod)
sc.updateHostSyscalls(pod)
return
}
func (sc *SySched) recomputeHostSyscalls(pods []*v1.Pod) sets.Set[string] {
syscalls := sets.New[string]()
for _, p := range pods {
syscall := sc.getSyscalls(p)
syscalls = syscalls.Union(syscall)
}
return syscalls
}
func (sc *SySched) removePod(pod *v1.Pod) {
nodeName := pod.Spec.NodeName
_, ok := sc.HostToPods[nodeName]
if !ok {
klog.V(5).Infof("removePod: Host %s not yet cached", nodeName)
return
}
for i, p := range sc.HostToPods[nodeName] {
if p.Name == pod.Name {
sc.HostToPods[nodeName] = remove(sc.HostToPods[nodeName], i)
sc.HostSyscalls[nodeName] = sc.recomputeHostSyscalls(sc.HostToPods[nodeName])
c, _ := sc.getHostSyscalls(nodeName)
klog.V(5).InfoS("remaining ", "syscalls", c, "node", nodeName)
return
}
}
return
}
func (sc *SySched) podAdded(obj interface{}) {
pod := obj.(*v1.Pod)
// Add already running pod to map
// This is for when our scheduler comes up after other pods
if pod.Status.Phase == v1.PodRunning {
klog.V(10).Infof("POD ADDED: %s/%s phase: %s", pod.Namespace, pod.Name, pod.Status.Phase)
sc.addPod(pod)
}
}
func (sc *SySched) podUpdated(old, new interface{}) {
pod := old.(*v1.Pod)
// Pod has been assigned to node, now can add to our map
if pod.Status.Phase == v1.PodPending && pod.Status.HostIP != "" {
klog.V(10).Infof("POD UPDATED. %s/%s", pod.Namespace, pod.Name)
sc.addPod(pod)
}
}
func (sc *SySched) podDeleted(obj interface{}) {
pod := obj.(*v1.Pod)
klog.V(10).Infof("POD DELETED: %s/%s", pod.Namespace, pod.Name)
sc.removePod(pod)
}
// getArgs : returns the arguments for the SySchedArg plugin.
func getArgs(obj runtime.Object) (*pluginconfig.SySchedArgs, error) {
SySchedArgs, ok := obj.(*pluginconfig.SySchedArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type SySchedArgs, got %T", obj)
}
return SySchedArgs, nil
}
// New initializes a new plugin and returns it.
func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
sc := SySched{handle: handle}
sc.HostToPods = make(map[string][]*v1.Pod)
sc.HostSyscalls = make(map[string]sets.Set[string])
sc.ExSAvg = 0
sc.ExSAvgCount = 1
args, err := getArgs(obj)
if err != nil {
return nil, err
}
// get the default syscall profile CR namespace and name for all syscalls
sc.DefaultProfileNamespace = args.DefaultProfileNamespace
sc.DefaultProfileName = args.DefaultProfileName
scheme := runtime.NewScheme()
_ = clientscheme.AddToScheme(scheme)
_ = v1.AddToScheme(scheme)
_ = v1alpha1.AddToScheme(scheme)
v1beta1.AddToScheme(scheme)
client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
if err != nil {
return nil, err
}
sc.client = client
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sc.podAdded,
UpdateFunc: sc.podUpdated,
DeleteFunc: sc.podDeleted,
},
)
return &sc, nil
}