pkg/jobmgr/goalstate/update_start.go (173 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package goalstate import ( "context" "time" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" "github.com/uber/peloton/.gen/peloton/api/v0/update" "github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless" "github.com/uber/peloton/.gen/peloton/private/models" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/goalstate" "github.com/uber/peloton/pkg/jobmgr/cached" jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common" log "github.com/sirupsen/logrus" ) // fetchWorkflowAndJobFromCache is a helper function to fetch the // workflow and the job from the cache for a given update entity. func fetchWorkflowAndJobFromCache( ctx context.Context, jobID *peloton.JobID, updateID *peloton.UpdateID, goalStateDriver *driver, ) (cachedWorkflow cached.Update, cachedJob cached.Job, err error) { // first fetch the job cachedJob = goalStateDriver.jobFactory.GetJob(jobID) if cachedJob == nil { // if job has been untracked, cancel the update and then enqueue into // goal state to untrack it. log.WithFields( log.Fields{ "job_id": jobID.GetValue(), "update_id": updateID.GetValue(), }).Info("job has been deleted, so canceling the update as well") cachedJob = goalStateDriver.jobFactory.AddJob(jobID) err = cachedJob.AddWorkflow(updateID).Cancel(ctx, nil) if err == nil { goalStateDriver.EnqueueUpdate(jobID, updateID, time.Now()) } // clean up the job since it is untracked before goalStateDriver.EnqueueJob(jobID, time.Now()) cachedJob = nil return } cachedWorkflow = cachedJob.AddWorkflow(updateID) return } // handleUnchangedInstancesInUpdate updates the runtime state of the // instances left unchanged with the given update; essentially, the // configuration and desired configuration version of all unchanged // tasks is updated to the newest version. func handleUnchangedInstancesInUpdate( ctx context.Context, cachedUpdate cached.Update, cachedJob cached.Job, jobConfig jobmgrcommon.JobConfig) error { runtimes := make(map[uint32]jobmgrcommon.RuntimeDiff) instanceCount := jobConfig.GetInstanceCount() instancesTotal := cachedUpdate.GetGoalState().Instances for i := uint32(0); i < instanceCount; i++ { // first find the instances which have not been updated found := false for _, j := range instancesTotal { if i == j { // instance has been updated found = true break } } if found == false { // instance is left unchanged with this update runtimeDiff := jobmgrcommon.RuntimeDiff{ jobmgrcommon.ConfigVersionField: jobConfig.GetChangeLog().GetVersion(), jobmgrcommon.DesiredConfigVersionField: jobConfig.GetChangeLog().GetVersion(), } runtimes[i] = runtimeDiff } } if len(runtimes) > 0 { // Just update the runtime of the tasks with the // new version and move on. _, instancesToBeRetried, err := cachedJob.PatchTasks(ctx, runtimes, false) if err != nil { return err } // if some patching of some instances need to be retried, return an // error here so that the UpdateStart action is retried. if len(instancesToBeRetried) != 0 { return _errTasksNotInCache } } return nil } // UpdateStart initializes the update. It will move the configuration version // of the tasks which are not touched by this update to the new version. // Then it will move the update state to ROLLING_FORWARD and enqueue to // goal state engine to start the rolling update process. func UpdateStart(ctx context.Context, entity goalstate.Entity) error { updateEnt := entity.(*updateEntity) goalStateDriver := updateEnt.driver // fetch the update and job from the cache cachedWorkflow, cachedJob, err := fetchWorkflowAndJobFromCache( ctx, updateEnt.jobID, updateEnt.id, goalStateDriver) if err != nil || cachedWorkflow == nil || cachedJob == nil { log.WithFields(log.Fields{ "update_id": updateEnt.id.GetValue(), }).WithError(err).Info("unable to start update") goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1) return err } if cachedWorkflow.GetState().State == update.State_INVALID { return UpdateReload(ctx, entity) } jobID := cachedWorkflow.JobID() // fetch the job configuration first obj, err := goalStateDriver.jobConfigOps.GetResult( ctx, jobID, cachedWorkflow.GetGoalState().JobVersion) if err != nil { goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1) return err } jobConfig := obj.JobConfig configAddOn := obj.ConfigAddOn var spec *stateless.JobSpec if obj.ApiVersion == common.V1AlphaApi { spec = obj.JobSpec } // lets write the new task configs first if err := cachedJob.CreateTaskConfigs( ctx, jobID, jobConfig, configAddOn, spec); err != nil { goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1) return err } if cachedWorkflow.GetWorkflowType() == models.WorkflowType_UPDATE { // Populate instancesAdded, instancesUpdated and instancesRemoved // by the update. This is not done in the handler because the previous // update may be running when this current update was created, and // hence the instances in this list may have changed. So do in start // to ensure that these list of instances remain the same // while the update is non-terminal. prevJobConfig, _, err := goalStateDriver.jobConfigOps.Get( ctx, jobID, cachedWorkflow.GetState().JobVersion, ) if err != nil { goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1) return err } instancesAdded, instancesUpdated, instancesRemoved, _, err := cached.GetInstancesToProcessForUpdate( ctx, cachedJob.ID(), prevJobConfig, jobConfig, goalStateDriver.taskStore, goalStateDriver.taskConfigV2Ops, ) if err != nil { goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1) return err } if err := cachedWorkflow.Modify( ctx, instancesAdded, instancesUpdated, instancesRemoved, ); err != nil { goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1) return err } } // update the configuration and desired configuration version of // all instances which do not need to be updated if err = handleUnchangedInstancesInUpdate( ctx, cachedWorkflow, cachedJob, jobConfig); err != nil { goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1) return err } // update the state of the job update if err = cachedJob.WriteWorkflowProgress( ctx, updateEnt.id, update.State_ROLLING_FORWARD, []uint32{}, []uint32{}, []uint32{}, ); err != nil { goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1) return err } log.WithFields(log.Fields{ "update_id": updateEnt.id.GetValue(), "job_id": cachedJob.ID().GetValue(), "update_type": cachedWorkflow.GetWorkflowType().String(), "instances_added": len(cachedWorkflow.GetInstancesAdded()), "instances_removed": len(cachedWorkflow.GetInstancesRemoved()), "instances_updated": len(cachedWorkflow.GetInstancesUpdated()), }).Info("update starting") goalStateDriver.EnqueueUpdate(jobID, updateEnt.id, time.Now()) goalStateDriver.mtx.updateMetrics.UpdateStart.Inc(1) return nil }