in pkg/jobmgr/goalstate/update_run.go [41:188]
func UpdateRun(ctx context.Context, entity goalstate.Entity) error {
updateEnt := entity.(*updateEntity)
goalStateDriver := updateEnt.driver
cachedWorkflow, cachedJob, err := fetchWorkflowAndJobFromCache(
ctx, updateEnt.jobID, updateEnt.id, goalStateDriver)
if err != nil || cachedWorkflow == nil || cachedJob == nil {
log.WithFields(log.Fields{
"update_id": updateEnt.id.GetValue(),
}).WithError(err).Info("unable to run update")
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
// TODO: remove after recovery is done when reading state
if cachedWorkflow.GetState().State == pbupdate.State_INVALID {
return UpdateReload(ctx, entity)
}
instancesCurrent, instancesDoneFromLastRun, instancesFailedFromLastRun, err :=
cached.GetUpdateProgress(
ctx,
cachedJob.ID(),
cachedWorkflow,
cachedWorkflow.GetGoalState().JobVersion,
cachedWorkflow.GetInstancesCurrent(),
goalStateDriver.taskStore,
)
if err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
instancesFailed := append(
cachedWorkflow.GetInstancesFailed(),
instancesFailedFromLastRun...)
instancesDone := append(
cachedWorkflow.GetInstancesDone(),
instancesDoneFromLastRun...)
// number of failed instances in the workflow exceeds limit and
// max instance retries is set, process the failed workflow and
// return directly
// TODO: use job SLA if GetMaxFailureInstances is not set
if cachedWorkflow.GetUpdateConfig().GetMaxFailureInstances() != 0 &&
uint32(len(instancesFailed)) >=
cachedWorkflow.GetUpdateConfig().GetMaxFailureInstances() {
err := processFailedUpdate(
ctx,
cachedJob,
cachedWorkflow,
instancesDone,
instancesFailed,
instancesCurrent,
goalStateDriver,
)
if err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
}
return err
}
instancesToAdd, instancesToUpdate, instancesToRemove :=
getInstancesForUpdateRun(
ctx,
cachedJob,
cachedWorkflow,
instancesCurrent,
instancesDone,
instancesFailed,
)
instancesToAdd, instancesToUpdate, instancesToRemove, instancesRemovedDone, err :=
confirmInstancesStatus(
ctx,
cachedJob,
cachedWorkflow,
instancesToAdd,
instancesToUpdate,
instancesToRemove,
)
if err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
instancesDone = append(instancesDone, instancesRemovedDone...)
if err := processUpdate(
ctx,
cachedJob,
cachedWorkflow,
instancesToAdd,
instancesToUpdate,
instancesToRemove,
goalStateDriver,
); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
if err := writeUpdateProgress(
ctx,
cachedJob,
cachedWorkflow,
cachedWorkflow.GetState().State,
instancesDone,
instancesFailed,
instancesCurrent,
instancesToAdd,
instancesToUpdate,
instancesToRemove,
); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
if err := postUpdateAction(
ctx,
cachedJob,
cachedWorkflow,
instancesToAdd,
instancesToUpdate,
instancesToRemove,
instancesDone,
instancesFailed,
goalStateDriver); err != nil {
goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
return err
}
// TODO (varung):
// - Use len for instances current
// - Remove instances_added, instances_removed and instances_updated
log.WithFields(log.Fields{
"update_id": updateEnt.id.GetValue(),
"job_id": cachedJob.ID().GetValue(),
"update_type": cachedWorkflow.GetWorkflowType().String(),
"instances_current": cachedWorkflow.GetInstancesCurrent(),
"instances_failed": len(cachedWorkflow.GetInstancesFailed()),
"instances_done": len(cachedWorkflow.GetInstancesDone()),
"instances_added": len(cachedWorkflow.GetInstancesAdded()),
"instances_removed": len(cachedWorkflow.GetInstancesRemoved()),
"instances_updated": len(cachedWorkflow.GetInstancesUpdated()),
}).Info("update running")
goalStateDriver.mtx.updateMetrics.UpdateRun.Inc(1)
return nil
}