pkg/jobmgr/goalstate/job_actions.go (325 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package goalstate
import (
"context"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/pkg/common/goalstate"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/jobmgr/cached"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
updateutil "github.com/uber/peloton/pkg/jobmgr/util/update"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc/yarpcerrors"
)
var _errTasksNotInCache = yarpcerrors.InternalErrorf("some tasks not in cache")
// JobUntrack deletes the job and tasks from the goal state engine and the cache.
func JobUntrack(ctx context.Context, entity goalstate.Entity) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := jobEnt.driver
cachedJob := goalStateDriver.jobFactory.GetJob(jobEnt.id)
if cachedJob == nil {
return nil
}
jobConfig, err := cachedJob.GetConfig(ctx)
if err != nil {
if !yarpcerrors.IsNotFound(err) {
// if config is not found, untrack the job from cache
return err
}
} else if jobConfig.GetType() == job.JobType_SERVICE {
// service jobs are always active and never untracked.
// Call runtime updater, because job runtime can change
// when an update is running on the job.
return JobRuntimeUpdater(ctx, entity)
}
// First clean from goal state
taskMap := cachedJob.GetAllTasks()
for instID := range taskMap {
goalStateDriver.DeleteTask(jobEnt.id, instID)
}
goalStateDriver.DeleteJob(jobEnt.id)
// Next clean up from the cache
goalStateDriver.jobFactory.ClearJob(jobEnt.id)
return nil
}
// JobStateInvalid dumps a sentry error to indicate that the
// job goal state, state combination is not valid
func JobStateInvalid(ctx context.Context, entity goalstate.Entity) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := jobEnt.driver
cachedJob := goalStateDriver.jobFactory.GetJob(jobEnt.id)
if cachedJob == nil {
return nil
}
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return err
}
log.WithFields(log.Fields{
"current_state": jobRuntime.State.String(),
"goal_state": jobRuntime.GoalState.String(),
"job_id": jobEnt.GetID(),
}).Error("unexpected job state")
goalStateDriver.mtx.jobMetrics.JobInvalidState.Inc(1)
return nil
}
// JobRecover tries to recover a partially created job.
// If job is not recoverable, it would untrack the job
func JobRecover(ctx context.Context, entity goalstate.Entity) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := jobEnt.driver
cachedJob := goalStateDriver.jobFactory.AddJob(jobEnt.id)
config, err := cachedJob.GetConfig(ctx)
// config is not created, job cannot be recovered.
if yarpcerrors.IsNotFound(err) {
log.WithFields(log.Fields{
"job_id": jobEnt.GetID(),
}).Info("job is not recoverable due to missing config")
runtime, err := cachedJob.GetRuntime(ctx)
// runtime may already be removed, ignore not found
// error here
if err != nil && !yarpcerrors.IsNotFound(err) {
return err
}
// delete from job_index
if err := goalStateDriver.jobIndexOps.Delete(
ctx,
cachedJob.ID(),
); err != nil {
return err
}
// remove the update
if len(runtime.GetUpdateID().GetValue()) != 0 {
if err := goalStateDriver.updateStore.DeleteUpdate(
ctx,
runtime.GetUpdateID(),
cachedJob.ID(),
runtime.GetConfigurationVersion(),
); err != nil {
return err
}
}
if err := goalStateDriver.jobStore.DeleteJob(
ctx,
cachedJob.ID().GetValue(),
); err != nil {
return err
}
// delete from active job in the end, after this step,
// we would not have any reference to the job.
if err := goalStateDriver.activeJobsOps.Delete(
ctx,
cachedJob.ID(),
); err != nil {
return err
}
return JobUntrack(ctx, entity)
}
if err != nil {
return err
}
// config exists, it means the job is created
log.WithFields(log.Fields{
"job_id": jobEnt.GetID(),
}).Info("job config is found and job is recoverable")
jobState := job.JobState_INITIALIZED
if config.GetType() == job.JobType_SERVICE {
// stateless job uses workflow to create the job,
// so directly move to PENDING state
jobState = job.JobState_PENDING
}
if err := cachedJob.Update(
ctx,
&job.JobInfo{Runtime: &job.RuntimeInfo{State: jobState}},
nil,
nil,
cached.UpdateCacheAndDB); err != nil {
return err
}
goalStateDriver.EnqueueJob(jobEnt.id, time.Now())
return nil
}
// DeleteJobFromActiveJobs deletes a terminal batch job from active jobs
// table
func DeleteJobFromActiveJobs(
ctx context.Context, entity goalstate.Entity) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.GetJob(jobEnt.id)
if cachedJob == nil {
return nil
}
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return err
}
cfg, err := cachedJob.GetConfig(ctx)
if err != nil {
return err
}
// delete a terminal batch job from the active jobs table
if cfg.GetType() == job.JobType_BATCH &&
util.IsPelotonJobStateTerminal(runtime.GetState()) {
if err := goalStateDriver.activeJobsOps.Delete(
ctx, jobEnt.id); err != nil {
return err
}
}
return nil
}
// JobStart starts all tasks of the job
func JobStart(
ctx context.Context,
entity goalstate.Entity,
) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.GetJob(jobEnt.id)
if cachedJob == nil {
return nil
}
runtimeDiff := make(map[uint32]jobmgrcommon.RuntimeDiff)
for i := range cachedJob.GetAllTasks() {
runtimeDiff[i] = jobmgrcommon.RuntimeDiff{
jobmgrcommon.GoalStateField: task.TaskState_RUNNING,
}
}
// We do not need to handle 'instancesToBeRetried' here since all tasks
// in runtimeDiff are being requeued to the goalstate. The action will be
// retried by the goalstate when the tasks are evaluated next time.
_, _, err := cachedJob.PatchTasks(ctx, runtimeDiff, false)
if err != nil {
return err
}
for i := range runtimeDiff {
goalStateDriver.EnqueueTask(cachedJob.ID(), i, time.Now())
}
var jobRuntime *job.RuntimeInfo
count := 0
for {
jobRuntime, err = cachedJob.GetRuntime(ctx)
if err != nil {
return err
}
jobRuntime.State = job.JobState_PENDING
jobRuntime.StateVersion = jobRuntime.GetDesiredStateVersion()
_, err = cachedJob.CompareAndSetRuntime(ctx, jobRuntime)
if err == jobmgrcommon.UnexpectedVersionError {
// concurrency error; retry MaxConcurrencyErrorRetry times
count = count + 1
if count < jobmgrcommon.MaxConcurrencyErrorRetry {
continue
}
}
if err != nil {
return errors.Wrap(err, "fail to update job runtime")
}
goalStateDriver.EnqueueJob(cachedJob.ID(), time.Now())
return nil
}
}
// JobDelete deletes a job from cache and DB
func JobDelete(
ctx context.Context,
entity goalstate.Entity,
) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.GetJob(jobEnt.id)
if cachedJob == nil {
return nil
}
err := cachedJob.Delete(ctx)
if err != nil {
return errors.Wrap(err, "failed to delete job from store")
}
// Delete job from goalstate and cache
taskMap := cachedJob.GetAllTasks()
for instID := range taskMap {
goalStateDriver.DeleteTask(cachedJob.ID(), instID)
}
goalStateDriver.DeleteJob(cachedJob.ID())
goalStateDriver.jobFactory.ClearJob(cachedJob.ID())
return nil
}
// JobReloadRuntime reloads the job runtime into the cache
func JobReloadRuntime(
ctx context.Context,
entity goalstate.Entity,
) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.AddJob(jobEnt.id)
jobRuntime, err := goalStateDriver.jobRuntimeOps.Get(
ctx,
&peloton.JobID{Value: jobEnt.GetID()},
)
if yarpcerrors.IsNotFound(err) {
// runtime is not created, see if config is created and the job is
// recoverable.
return JobRecover(ctx, entity)
}
err = cachedJob.Update(
ctx,
&job.JobInfo{
Runtime: jobRuntime,
}, nil,
nil,
cached.UpdateCacheOnly,
)
if err != nil {
return err
}
goalStateDriver.EnqueueJob(jobEnt.id, time.Now())
return nil
}
// EnqueueJobUpdate enqueues an ongoing update, if any,
// for a stateless job. It is noop for batch jobs.
func EnqueueJobUpdate(
ctx context.Context,
entity goalstate.Entity,
) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.GetJob(jobEnt.id)
if cachedJob == nil {
return nil
}
jobConfig, err := cachedJob.GetConfig(ctx)
if err != nil {
return err
}
if jobConfig.GetType() != job.JobType_SERVICE {
return nil
}
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return err
}
if !updateutil.HasUpdate(jobRuntime) {
return nil
}
updateInfo, err := goalStateDriver.updateStore.GetUpdateProgress(ctx, jobRuntime.GetUpdateID())
if err != nil {
return err
}
if !cached.IsUpdateStateTerminal(updateInfo.GetState()) {
goalStateDriver.EnqueueUpdate(jobEnt.id, jobRuntime.GetUpdateID(), time.Now())
}
return nil
}
// JobKillAndDelete terminates each task in job, and makes
// sure tasks would not get restarted. It deletes the job
// if all tasks are in terminated states.
func JobKillAndDelete(
ctx context.Context,
entity goalstate.Entity,
) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.GetJob(jobEnt.id)
if cachedJob == nil {
return nil
}
jobState, _, err :=
killJob(ctx, cachedJob, goalStateDriver)
if err != nil {
return err
}
// job is in terminal state now, can safely delete the job
if util.IsPelotonJobStateTerminal(jobState) {
log.WithField("job_id", cachedJob.ID()).
Info("all tasks are killed, deleting the job")
return JobDelete(ctx, entity)
}
log.WithFields(log.Fields{
"job_id": cachedJob.ID(),
"state": jobState.String(),
}).Info("some tasks are not killed, waiting to kill before deleting")
return nil
}
// JobKillAndUntrack kills all of the pods in the job, makes sure they would
// not start again and untrack the job if possible
func JobKillAndUntrack(ctx context.Context, entity goalstate.Entity) error {
jobEnt := entity.(*jobEntity)
goalStateDriver := entity.(*jobEntity).driver
cachedJob := goalStateDriver.jobFactory.GetJob(jobEnt.id)
if cachedJob == nil {
return nil
}
runtimeDiffNonTerminatedTasks, _, err :=
stopTasks(ctx, cachedJob, goalStateDriver)
if err != nil {
return err
}
if len(runtimeDiffNonTerminatedTasks) == 0 {
log.WithField("job_id", cachedJob.ID()).
Info("all tasks are killed, untrack the job")
return JobUntrack(ctx, entity)
}
log.WithFields(log.Fields{
"job_id": cachedJob.ID(),
}).Info("some tasks are not killed, waiting to kill before untracking")
return nil
}