perf-tools/framework/app_manager.go (284 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"fmt"
"regexp"
"time"
"github.com/apache/yunikorn-release/perf-tools/constants"
"github.com/apache/yunikorn-release/perf-tools/utils"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type AppManager interface {
Create(schedulerName string, appInfo *AppInfo) error
Delete(appInfo *AppInfo) error
RefreshAppStatus(appInfo *AppInfo) error
RefreshTasksStatusAfterRunning(appInfo *AppInfo) error
WaitForAppsToBeCleanedUp(appInfos *AppInfo, timeout time.Duration) error
WaitForAppsToBeSatisfied(appInfos *AppInfo, timeout time.Duration) error
// create an app and wait for it to be running, refresh tasks status at last
CreateWaitAndRefreshTasksStatus(schedulerName string, appInfo *AppInfo, timeout time.Duration) error
// delete an app and wait for it to be cleaned up
DeleteWait(appInfo *AppInfo, timeout time.Duration) error
}
type DeploymentsAppManager struct {
kubeClient *utils.KubeClient
nameRegexp *regexp.Regexp
}
func NewDeploymentsAppManager(kubeClient *utils.KubeClient) AppManager {
regexp, _ := regexp.Compile(`[_\W]`)
return &DeploymentsAppManager{
kubeClient: kubeClient,
nameRegexp: regexp,
}
}
func (dam *DeploymentsAppManager) Create(schedulerName string, appInfo *AppInfo) error {
if len(appInfo.RequestInfos) == 0 {
return fmt.Errorf("request info not defined for app %s", appInfo.AppID)
}
for reqIndex, requestInfo := range appInfo.RequestInfos {
// init container
var container apiv1.Container
if len(appInfo.PodSpec.Containers) > 0 {
container = appInfo.PodSpec.Containers[0]
} else {
container = apiv1.Container{}
container.Name = constants.DefaultContainerName
container.Image = constants.DefaultContainerImage
}
if requestInfo.RequestResources != nil {
if container.Resources.Requests == nil {
container.Resources.Requests = apiv1.ResourceList{}
}
for resourceName, resourceValue := range requestInfo.RequestResources {
container.Resources.Requests[apiv1.ResourceName(resourceName)] = resource.MustParse(resourceValue)
}
}
if requestInfo.LimitResources != nil {
if container.Resources.Limits == nil {
container.Resources.Limits = apiv1.ResourceList{}
}
for resourceName, resourceValue := range requestInfo.LimitResources {
container.Resources.Limits[apiv1.ResourceName(resourceName)] = resource.MustParse(resourceValue)
}
}
// init and create deployment
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: appInfo.Namespace,
Name: dam.getDeploymentName(appInfo, reqIndex),
},
Spec: appsv1.DeploymentSpec{
Replicas: &requestInfo.Number,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constants.LabelAppID: appInfo.AppID,
},
},
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
constants.LabelAppID: appInfo.AppID,
constants.LabelQueue: appInfo.Queue,
},
Annotations: appInfo.PodTemplateSpec.ObjectMeta.Annotations,
},
Spec: apiv1.PodSpec{
SchedulerName: schedulerName,
HostNetwork: appInfo.PodSpec.HostNetwork,
Containers: []apiv1.Container{
container,
},
Tolerations: appInfo.PodSpec.Tolerations,
NodeSelector: appInfo.PodSpec.NodeSelector,
PriorityClassName: requestInfo.PriorityClass,
},
},
},
}
err := dam.kubeClient.CreateDeployment(appInfo.Namespace, deployment)
if err != nil {
return err
}
}
return nil
}
func (dam *DeploymentsAppManager) getDeploymentName(appInfo *AppInfo, reqIndex int) string {
normalizedName := dam.nameRegexp.ReplaceAllString(appInfo.AppID, "-")
return fmt.Sprintf("%s-%d", normalizedName, reqIndex)
}
func (dam *DeploymentsAppManager) Delete(appInfo *AppInfo) error {
for i := 0; i < len(appInfo.RequestInfos); i++ {
err := dam.kubeClient.DeleteDeployment(appInfo.Namespace, dam.getDeploymentName(appInfo, i))
if err != nil {
return err
}
}
return nil
}
func (dam *DeploymentsAppManager) RefreshAppStatus(appInfo *AppInfo) error {
var summaryMetrics [3]int
firstCreateTime := time.Time{}
for i := 0; i < len(appInfo.RequestInfos); i++ {
createTime, metrics, err := dam.kubeClient.GetDeploymentInfo(
appInfo.Namespace, dam.getDeploymentName(appInfo, i))
if err != nil {
utils.Logger.Info("failed to refresh app status", zap.Error(err))
continue
}
if firstCreateTime.IsZero() || firstCreateTime.After(createTime) {
firstCreateTime = createTime
}
summaryMetrics[0] += metrics[0]
summaryMetrics[1] += metrics[1]
summaryMetrics[2] += metrics[2]
}
appInfo.SetAppStatus(summaryMetrics[0], summaryMetrics[1], summaryMetrics[2])
return nil
}
func (dam *DeploymentsAppManager) RefreshTasksStatusAfterRunning(appInfo *AppInfo) error {
podList, err := dam.kubeClient.GetPods(appInfo.Namespace,
utils.GetListOptions(map[string]string{constants.LabelAppID: appInfo.AppID}))
if err != nil {
return err
}
tasksStatus := make(map[string]*TaskStatus)
maxRunningTime := time.Time{}
firstCreateTime := time.Time{}
for _, pod := range podList.Items {
createTime := pod.CreationTimestamp.Time
startTime := pod.Status.StartTime.Time
requestResources := ParseResourceFromResourceList(&pod.Spec.Containers[0].Resources.Requests)
// init conditions map
condMap := make(map[TaskConditionType]*TaskCondition)
condMap[PodCreated] = &TaskCondition{
CondType: PodCreated,
TransitionTime: pod.CreationTimestamp.Time,
}
condMap[PodStarted] = &TaskCondition{
CondType: PodStarted,
TransitionTime: startTime,
}
for _, cond := range pod.Status.Conditions {
condMap[TaskConditionType(cond.Type)] = &TaskCondition{
CondType: TaskConditionType(cond.Type),
TransitionTime: cond.LastTransitionTime.Time,
}
}
// set running time from the last condition (ContainersReady)
var runningTime time.Time
if readyCond, ok := condMap[ContainersReady]; ok {
runningTime = readyCond.TransitionTime
} else {
utils.Logger.Fatal("unexpected conditions", zap.Any("Conditions", condMap))
}
// transfer to ordered conditions
orderedCondTypes := GetOrderedTaskConditionTypes()
conditions := make([]*TaskCondition, len(orderedCondTypes))
for idx, condType := range orderedCondTypes {
if cond, ok := condMap[condType]; ok {
conditions[idx] = cond
} else {
utils.Logger.Fatal("unknown condition", zap.Any("condType", condType),
zap.Any("podName", pod.Name))
}
}
taskStatus := NewTaskStatus(pod.Name, pod.Spec.NodeName,
createTime, runningTime, requestResources, conditions)
tasksStatus[taskStatus.TaskID] = taskStatus
// update maxRunningTime
if runningTime.After(maxRunningTime) {
maxRunningTime = runningTime
}
if firstCreateTime.IsZero() || createTime.Before(firstCreateTime) {
firstCreateTime = createTime
}
}
appInfo.TasksStatus = tasksStatus
appInfo.AppStatus.RunningTime = maxRunningTime
appInfo.AppStatus.CreateTime = firstCreateTime
return nil
}
func (dam *DeploymentsAppManager) WaitForAppsToBeCleanedUp(appInfo *AppInfo, timeout time.Duration) error {
startTime := time.Now()
i := 1
return waitForCondition(func() bool {
err := dam.RefreshAppStatus(appInfo)
if err != nil {
return true
}
if appInfo.AppStatus.DesiredNum != 0 || appInfo.AppStatus.CreatedNum != 0 || appInfo.AppStatus.ReadyNum != 0 {
if time.Since(startTime) > 60*time.Duration(i)*time.Second {
utils.Logger.Info("still waiting for app to be cleaned up",
zap.String("appID", appInfo.AppID),
zap.Duration("timeout", timeout),
zap.Duration("elapseTime", time.Since(startTime)),
zap.Int("desiredNum", appInfo.AppStatus.DesiredNum),
zap.Int("createdNum", appInfo.AppStatus.CreatedNum),
zap.Int("readyNum", appInfo.AppStatus.ReadyNum))
i++
}
return false
}
utils.Logger.Info("app is cleaned up", zap.String("appID", appInfo.AppID),
zap.Any("appStatus", appInfo.AppStatus))
return true
}, 1*time.Second, timeout)
}
func (dam *DeploymentsAppManager) WaitForAppsToBeSatisfied(appInfo *AppInfo, timeout time.Duration) error {
startTime := time.Now()
i := 1
return waitForCondition(func() bool {
err := dam.RefreshAppStatus(appInfo)
if err != nil {
return true
}
if appInfo.AppStatus.DesiredNum == 0 || appInfo.AppStatus.DesiredNum != appInfo.AppStatus.ReadyNum {
if time.Since(startTime) > 5*time.Duration(i)*time.Second {
utils.Logger.Info("still waiting for app to be running",
zap.String("appID", appInfo.AppID),
zap.Duration("timeout", timeout),
zap.Duration("elapseTime", time.Since(startTime)),
zap.Int("desiredNum", appInfo.AppStatus.DesiredNum),
zap.Int("readyNum", appInfo.AppStatus.ReadyNum))
i++
}
return false
}
return true
}, 1*time.Second, timeout)
}
func (dam *DeploymentsAppManager) CreateWaitAndRefreshTasksStatus(schedulerName string, appInfo *AppInfo,
timeout time.Duration) error {
err := dam.Create(schedulerName, appInfo)
if err != nil {
return fmt.Errorf("failed to create app: %s", err.Error())
}
// wait for this app to be running (all pods are scheduled to be running)
err = dam.WaitForAppsToBeSatisfied(appInfo, timeout)
if err != nil {
return fmt.Errorf("failed to wait for this app to be running: %s", err.Error())
}
// refresh task status
err = dam.RefreshTasksStatusAfterRunning(appInfo)
if err != nil {
return fmt.Errorf("failed to refresh task status: %s", err.Error())
}
return nil
}
func (dam *DeploymentsAppManager) DeleteWait(appInfo *AppInfo,
timeout time.Duration) error {
err := dam.Delete(appInfo)
if err != nil {
return fmt.Errorf("failed to delete app: %s", err.Error())
}
// wait for this app to be cleaned up
err = dam.WaitForAppsToBeCleanedUp(appInfo, timeout)
if err != nil {
return fmt.Errorf("failed to wait for this app to be cleaned up: %s", err.Error())
}
return nil
}
// copied from yunikorn-k8shim to avoid importing too many dependencies
func waitForCondition(eval func() bool, interval time.Duration, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for {
if eval() {
return nil
}
if time.Now().After(deadline) {
return fmt.Errorf("timeout waiting for condition")
}
time.Sleep(interval)
}
}