pkg/jobmgr/goalstate/job_kill.go (167 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package goalstate import ( "context" "time" "github.com/uber/peloton/.gen/peloton/api/v0/job" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" "github.com/uber/peloton/.gen/peloton/api/v0/task" "github.com/uber/peloton/pkg/common/goalstate" "github.com/uber/peloton/pkg/common/util" "github.com/uber/peloton/pkg/jobmgr/cached" jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "go.uber.org/yarpc/yarpcerrors" ) // JobKill will stop all tasks in the job. func JobKill(ctx context.Context, entity goalstate.Entity) error { id := entity.GetID() jobID := &peloton.JobID{Value: id} goalStateDriver := entity.(*jobEntity).driver cachedJob := goalStateDriver.jobFactory.GetJob(jobID) if cachedJob == nil { return nil } jobState, nonTerminalTaskKilled, err := killJob(ctx, cachedJob, goalStateDriver) if err != nil { return err } // Only enqueue the job into goal state: // 1. any of the non terminated tasks need to be killed. // 2. job state is already KILLED if nonTerminalTaskKilled || util.IsPelotonJobStateTerminal(jobState) { EnqueueJobWithDefaultDelay(jobID, goalStateDriver, cachedJob) } log.WithField("job_id", id). Info("initiated kill of all tasks in the job") return nil } // createRuntimeDiffForKill creates the runtime diffs to kill the tasks in job. // it returns: // runtimeDiffNonTerminatedTasks which is used to kill non-terminated tasks, // runtimeDiffTerminatedTasks which is used to kill tasks already terminal // state (to prevent restart), // runtimeDiffAll which is a union of runtimeDiffNonTerminatedTasks and // runtimeDiffTerminatedTasks func createRuntimeDiffForKill( ctx context.Context, cachedJob cached.Job, ) ( runtimeDiffNonTerminatedTasks map[uint32]jobmgrcommon.RuntimeDiff, runtimeDiffTerminatedTasks map[uint32]jobmgrcommon.RuntimeDiff, runtimeDiffAll map[uint32]jobmgrcommon.RuntimeDiff, err error, ) { runtimeDiffNonTerminatedTasks = make(map[uint32]jobmgrcommon.RuntimeDiff) runtimeDiffTerminatedTasks = make(map[uint32]jobmgrcommon.RuntimeDiff) runtimeDiffAll = make(map[uint32]jobmgrcommon.RuntimeDiff) tasks := cachedJob.GetAllTasks() for instanceID, cachedTask := range tasks { runtime, err := cachedTask.GetRuntime(ctx) // runtime not created yet, ignore the task if yarpcerrors.IsNotFound(err) { continue } if err != nil { log.WithError(err). WithField("job_id", cachedJob.ID().Value). WithField("instance_id", instanceID). Info("failed to fetch task runtime to kill a job") return nil, nil, nil, err } // A task in terminal state can be running later due to failure // retry (batch job) or task restart (stateless job), so it is // necessary to kill a task even if it is in terminal state as // long as the goal state is not KILLED. if runtime.GetGoalState() == task.TaskState_KILLED { continue } runtimeDiff := jobmgrcommon.RuntimeDiff{ jobmgrcommon.GoalStateField: task.TaskState_KILLED, jobmgrcommon.MessageField: "Task stop API request", jobmgrcommon.ReasonField: "", jobmgrcommon.TerminationStatusField: &task.TerminationStatus{ Reason: task.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_ON_REQUEST, }, jobmgrcommon.DesiredHostField: "", } runtimeDiffAll[instanceID] = runtimeDiff if util.IsPelotonStateTerminal(runtime.GetState()) { runtimeDiffTerminatedTasks[instanceID] = runtimeDiff } else { runtimeDiffNonTerminatedTasks[instanceID] = runtimeDiff } } return runtimeDiffNonTerminatedTasks, runtimeDiffTerminatedTasks, runtimeDiffAll, nil } // calculateJobState calculates if the job to be killed is // in KILLING state or KILLED state func calculateJobState( ctx context.Context, cachedJob cached.Job, jobRuntime *job.RuntimeInfo, runtimeDiffNonTerminatedTasks map[uint32]jobmgrcommon.RuntimeDiff) job.JobState { // if no non-terminal instance is killed, check if the job // should directly enter KILLED state if len(runtimeDiffNonTerminatedTasks) == 0 { if cachedJob.GetJobType() == job.JobType_BATCH && jobRuntime.GetState() != job.JobState_INITIALIZED { return job.JobState_KILLING } for _, cachedTask := range cachedJob.GetAllTasks() { runtime, err := cachedTask.GetRuntime(ctx) if err != nil || !util.IsPelotonStateTerminal(runtime.GetState()) { return job.JobState_KILLING } } return job.JobState_KILLED } return job.JobState_KILLING } // killJob kills all tasks in the job, and returns // 1. the new job state after the kill // 2. whether any non-terminated task is killed func killJob( ctx context.Context, cachedJob cached.Job, goalStateDriver Driver, ) (newState job.JobState, taskKilled bool, err error) { // Get job runtime and update job state to killing jobRuntime, err := cachedJob.GetRuntime(ctx) if err != nil { err = errors.Wrap(err, "failed to get job runtime during job kill") return } runtimeDiffNonTerminatedTasks, allTasksMarked, err := stopTasks(ctx, cachedJob, goalStateDriver) if err != nil { err = errors.Wrap(err, "failed to update task runtimes to kill a job") return } // if the goal state of some tasks could not be patched, do not update // the state of the job. We should update the job state to KILLING only // when the goalstate of all tasks have been set to KILLED. if !allTasksMarked { return jobRuntime.GetState(), false, nil } jobState := calculateJobState( ctx, cachedJob, jobRuntime, runtimeDiffNonTerminatedTasks, ) runtimeUpdate := &job.RuntimeInfo{ State: jobState, StateVersion: jobRuntime.DesiredStateVersion, } if util.IsPelotonJobStateTerminal(jobState) { runtimeUpdate.CompletionTime = time.Now().UTC().Format(time.RFC3339Nano) } // update job state as well as state version, // once state version == desired state version, // goal state engine knows that the all the tasks // are being sent to task goal state engine to kill and // no further action is needed. err = cachedJob.Update(ctx, &job.JobInfo{ Runtime: runtimeUpdate, }, nil, nil, cached.UpdateCacheAndDB) if err != nil { err = errors.Wrap(err, "failed to update job runtime during job kill") return } return jobState, len(runtimeDiffNonTerminatedTasks) > 0, err } func stopTasks( ctx context.Context, cachedJob cached.Job, goalStateDriver Driver, ) ( runtimeDiffNonTerminatedTasks map[uint32]jobmgrcommon.RuntimeDiff, allTasksMarked bool, err error, ) { // Update task runtimes in DB and cache to kill task runtimeDiffNonTerminatedTasks, _, runtimeDiffAll, err := createRuntimeDiffForKill(ctx, cachedJob) if err != nil { return nil, false, err } _, instancesToBeRetried, err := cachedJob.PatchTasks(ctx, runtimeDiffAll, false) // Schedule non terminated tasks in goal state engine. // This should happen even if PatchTasks fail, so if part of // the tasks are updated successfully, those tasks can be // terminated. Otherwise, those tasks would not be enqueued // into goal state engine in JobKill retry. for instanceID := range runtimeDiffNonTerminatedTasks { goalStateDriver.EnqueueTask(cachedJob.ID(), instanceID, time.Now()) } // If patching of few non terminal tasks failed, enqueue // the job so that the action is retried if len(instancesToBeRetried) != 0 { EnqueueJobWithDefaultDelay(cachedJob.ID(), goalStateDriver, cachedJob) return runtimeDiffNonTerminatedTasks, false, err } return runtimeDiffNonTerminatedTasks, true, err }