pkg/appmgmt/general/general.go (195 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package general
import (
"strconv"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8sCache "k8s.io/client-go/tools/cache"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/log"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
"go.uber.org/zap"
)
// Manager implements interfaces#Recoverable, interfaces#AppManager
// generic app management service watches events from all the pods,
// it recognize apps by reading pod's spec labels, if there are proper info such as
// applicationID, queue name found, and claim it as an app or a app task,
// then report them to scheduler cache by calling am protocol
type Manager struct {
apiProvider client.APIProvider
gangSchedulingDisabled bool
podEventHandler *PodEventHandler
}
func NewManager(apiProvider client.APIProvider, podEventHandler *PodEventHandler) *Manager {
return &Manager{
apiProvider: apiProvider,
gangSchedulingDisabled: conf.GetSchedulerConf().DisableGangScheduling,
podEventHandler: podEventHandler,
}
}
// this implements AppManagementService interface
func (os *Manager) Name() string {
return "general"
}
// this implements AppManagementService interface
func (os *Manager) ServiceInit() error {
os.apiProvider.AddEventHandler(
&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
FilterFn: os.filterPods,
AddFn: os.AddPod,
UpdateFn: os.updatePod,
DeleteFn: os.deletePod,
})
return nil
}
// this implements AppManagementService interface
func (os *Manager) Start() error {
// generic app manager leverages the shared context,
// no other service, go routine is required to be started
return nil
}
// this implements AppManagementService interface
func (os *Manager) Stop() {
// noop
}
func isStateAwareDisabled(pod *v1.Pod) bool {
value := utils.GetPodLabelValue(pod, constants.LabelDisableStateAware)
if value == "" {
return false
}
result, err := strconv.ParseBool(value)
if err != nil {
log.Log(log.ShimAppMgmtGeneral).Debug("unable to parse label for pod",
zap.String("namespace", pod.Namespace),
zap.String("name", pod.Name),
zap.String("label", constants.LabelDisableStateAware),
zap.Error(err))
return false
}
return result
}
func getOwnerReference(pod *v1.Pod) []metav1.OwnerReference {
// Just return the originator pod as the owner of placeholder pods
controller := false
blockOwnerDeletion := true
ref := metav1.OwnerReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
}
return []metav1.OwnerReference{ref}
}
// filter pods by scheduler name and state
func (os *Manager) filterPods(obj interface{}) bool {
switch obj.(type) {
case *v1.Pod:
pod := obj.(*v1.Pod)
return utils.GetApplicationIDFromPod(pod) != ""
default:
return false
}
}
// AddPod Add application and task using pod metadata
// Visibility: Public only for testing
func (os *Manager) AddPod(obj interface{}) {
pod, err := utils.Convert2Pod(obj)
if err != nil {
log.Log(log.ShimAppMgmtGeneral).Error("failed to add pod", zap.Error(err))
return
}
log.Log(log.ShimAppMgmtGeneral).Debug("pod added",
zap.String("appType", os.Name()),
zap.String("Name", pod.Name),
zap.String("Namespace", pod.Namespace))
os.podEventHandler.HandleEvent(AddPod, Informers, pod)
}
// when pod resource is modified, we need to act accordingly
// e.g vertical scale out the pod, this requires the scheduler to be aware of this
func (os *Manager) updatePod(old, new interface{}) {
oldPod, err := utils.Convert2Pod(old)
if err != nil {
log.Log(log.ShimAppMgmtGeneral).Error("expecting a pod object", zap.Error(err))
return
}
newPod, err := utils.Convert2Pod(new)
if err != nil {
log.Log(log.ShimAppMgmtGeneral).Error("expecting a pod object", zap.Error(err))
return
}
// triggered when pod status' phase changes
if oldPod.Status.Phase != newPod.Status.Phase {
// pod succeed or failed means all containers in the pod have been terminated,
// and these container won't be restarted. In this case, we can safely release
// the resources for this allocation. And mark the task is done.
if utils.IsPodTerminated(newPod) {
log.Log(log.ShimAppMgmtGeneral).Info("task completes",
zap.String("appType", os.Name()),
zap.String("namespace", newPod.Namespace),
zap.String("podName", newPod.Name),
zap.String("podUID", string(newPod.UID)),
zap.String("podStatus", string(newPod.Status.Phase)))
os.podEventHandler.HandleEvent(UpdatePod, Informers, newPod)
}
}
}
// this function is called when a pod is deleted from api-server.
// when a pod is completed, the equivalent task's state will also be completed
// optionally, we run a completionHandler per workload, in order to determine
// if a application is completed along with this pod's completion
func (os *Manager) deletePod(obj interface{}) {
// when a pod is deleted, we need to check its role.
// for spark, if driver pod is deleted, then we consider the app is completed
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
case k8sCache.DeletedFinalStateUnknown:
var err error
pod, err = utils.Convert2Pod(t.Obj)
if err != nil {
log.Log(log.ShimAppMgmtGeneral).Error(err.Error())
return
}
default:
log.Log(log.ShimAppMgmtGeneral).Error("cannot convert to pod")
return
}
log.Log(log.ShimAppMgmtGeneral).Info("delete pod",
zap.String("appType", os.Name()),
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podUID", string(pod.UID)))
os.podEventHandler.HandleEvent(DeletePod, Informers, pod)
}
func (os *Manager) ListPods() ([]*v1.Pod, error) {
log.Log(log.ShimAppMgmtGeneral).Info("Retrieving pod list")
// list all pods on this cluster
appPods, err := os.apiProvider.GetAPIs().PodInformer.Lister().List(labels.NewSelector())
if err != nil {
return nil, err
}
log.Log(log.ShimAppMgmtGeneral).Info("Pod list retrieved from api server", zap.Int("nr of pods", len(appPods)))
// get existing apps
existingApps := make(map[string]struct{})
podsRecovered := 0
podsWithoutMetaData := 0
pods := make([]*v1.Pod, 0)
for _, pod := range appPods {
log.Log(log.ShimAppMgmtGeneral).Debug("Looking at pod for recovery candidates", zap.String("podNamespace", pod.Namespace), zap.String("podName", pod.Name))
// general filter passes, and pod is assigned
// this means the pod is already scheduled by scheduler for an existing app
if utils.GetApplicationIDFromPod(pod) != "" && utils.IsAssignedPod(pod) {
if meta, ok := getAppMetadata(pod, true); ok {
podsRecovered++
pods = append(pods, pod)
log.Log(log.ShimAppMgmtGeneral).Debug("Adding appID as recovery candidate", zap.String("appID", meta.ApplicationID))
existingApps[meta.ApplicationID] = struct{}{}
} else {
podsWithoutMetaData++
}
}
}
log.Log(log.ShimAppMgmtGeneral).Info("Application recovery statistics",
zap.Int("nr of recoverable apps", len(existingApps)),
zap.Int("nr of total pods", len(appPods)),
zap.Int("nr of pods without application metadata", podsWithoutMetaData),
zap.Int("nr of pods to be recovered", podsRecovered))
return pods, nil
}
func (os *Manager) GetExistingAllocation(pod *v1.Pod) *si.Allocation {
if meta, valid := getAppMetadata(pod, false); valid {
// when submit a task, we use pod UID as the allocationKey,
// to keep consistent, during recovery, the pod UID is also used
// for an Allocation.
placeholder := utils.GetPlaceholderFlagFromPodSpec(pod)
taskGroupName := utils.GetTaskGroupFromPodSpec(pod)
creationTime := pod.CreationTimestamp.Unix()
meta.Tags[siCommon.CreationTime] = strconv.FormatInt(creationTime, 10)
return &si.Allocation{
AllocationKey: string(pod.UID),
AllocationTags: meta.Tags,
UUID: string(pod.UID),
ResourcePerAlloc: common.GetPodResource(pod),
NodeID: pod.Spec.NodeName,
ApplicationID: meta.ApplicationID,
Placeholder: placeholder,
TaskGroupName: taskGroupName,
PartitionName: constants.DefaultPartition,
}
}
return nil
}