in pkg/jobmgr/goalstate/update_start.go [127:250]
func UpdateStart(ctx context.Context, entity goalstate.Entity) error {
updateEnt := entity.(*updateEntity)
goalStateDriver := updateEnt.driver
// fetch the update and job from the cache
cachedWorkflow, cachedJob, err := fetchWorkflowAndJobFromCache(
ctx, updateEnt.jobID, updateEnt.id, goalStateDriver)
if err != nil || cachedWorkflow == nil || cachedJob == nil {
log.WithFields(log.Fields{
"update_id": updateEnt.id.GetValue(),
}).WithError(err).Info("unable to start update")
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
if cachedWorkflow.GetState().State == update.State_INVALID {
return UpdateReload(ctx, entity)
}
jobID := cachedWorkflow.JobID()
// fetch the job configuration first
obj, err := goalStateDriver.jobConfigOps.GetResult(
ctx,
jobID,
cachedWorkflow.GetGoalState().JobVersion)
if err != nil {
goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
return err
}
jobConfig := obj.JobConfig
configAddOn := obj.ConfigAddOn
var spec *stateless.JobSpec
if obj.ApiVersion == common.V1AlphaApi {
spec = obj.JobSpec
}
// lets write the new task configs first
if err := cachedJob.CreateTaskConfigs(
ctx,
jobID,
jobConfig,
configAddOn,
spec); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
return err
}
if cachedWorkflow.GetWorkflowType() == models.WorkflowType_UPDATE {
// Populate instancesAdded, instancesUpdated and instancesRemoved
// by the update. This is not done in the handler because the previous
// update may be running when this current update was created, and
// hence the instances in this list may have changed. So do in start
// to ensure that these list of instances remain the same
// while the update is non-terminal.
prevJobConfig, _, err := goalStateDriver.jobConfigOps.Get(
ctx,
jobID,
cachedWorkflow.GetState().JobVersion,
)
if err != nil {
goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
return err
}
instancesAdded, instancesUpdated, instancesRemoved, _, err := cached.GetInstancesToProcessForUpdate(
ctx,
cachedJob.ID(),
prevJobConfig,
jobConfig,
goalStateDriver.taskStore,
goalStateDriver.taskConfigV2Ops,
)
if err != nil {
goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
return err
}
if err := cachedWorkflow.Modify(
ctx,
instancesAdded,
instancesUpdated,
instancesRemoved,
); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
return err
}
}
// update the configuration and desired configuration version of
// all instances which do not need to be updated
if err = handleUnchangedInstancesInUpdate(
ctx, cachedWorkflow, cachedJob, jobConfig); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
return err
}
// update the state of the job update
if err = cachedJob.WriteWorkflowProgress(
ctx,
updateEnt.id,
update.State_ROLLING_FORWARD,
[]uint32{},
[]uint32{},
[]uint32{},
); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
return err
}
log.WithFields(log.Fields{
"update_id": updateEnt.id.GetValue(),
"job_id": cachedJob.ID().GetValue(),
"update_type": cachedWorkflow.GetWorkflowType().String(),
"instances_added": len(cachedWorkflow.GetInstancesAdded()),
"instances_removed": len(cachedWorkflow.GetInstancesRemoved()),
"instances_updated": len(cachedWorkflow.GetInstancesUpdated()),
}).Info("update starting")
goalStateDriver.EnqueueUpdate(jobID, updateEnt.id, time.Now())
goalStateDriver.mtx.updateMetrics.UpdateStart.Inc(1)
return nil
}