func UpdateStart()

in pkg/jobmgr/goalstate/update_start.go [127:250]


func UpdateStart(ctx context.Context, entity goalstate.Entity) error {
	updateEnt := entity.(*updateEntity)
	goalStateDriver := updateEnt.driver

	// fetch the update and job from the cache
	cachedWorkflow, cachedJob, err := fetchWorkflowAndJobFromCache(
		ctx, updateEnt.jobID, updateEnt.id, goalStateDriver)
	if err != nil || cachedWorkflow == nil || cachedJob == nil {
		log.WithFields(log.Fields{
			"update_id": updateEnt.id.GetValue(),
		}).WithError(err).Info("unable to start update")
		goalStateDriver.mtx.updateMetrics.UpdateRunFail.Inc(1)
		return err
	}

	if cachedWorkflow.GetState().State == update.State_INVALID {
		return UpdateReload(ctx, entity)
	}

	jobID := cachedWorkflow.JobID()
	// fetch the job configuration first
	obj, err := goalStateDriver.jobConfigOps.GetResult(
		ctx,
		jobID,
		cachedWorkflow.GetGoalState().JobVersion)
	if err != nil {
		goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
		return err
	}
	jobConfig := obj.JobConfig
	configAddOn := obj.ConfigAddOn

	var spec *stateless.JobSpec
	if obj.ApiVersion == common.V1AlphaApi {
		spec = obj.JobSpec
	}

	// lets write the new task configs first
	if err := cachedJob.CreateTaskConfigs(
		ctx,
		jobID,
		jobConfig,
		configAddOn,
		spec); err != nil {
		goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
		return err
	}

	if cachedWorkflow.GetWorkflowType() == models.WorkflowType_UPDATE {
		// Populate instancesAdded, instancesUpdated and instancesRemoved
		// by the update. This is not done in the handler because the previous
		// update may be running when this current update was created, and
		// hence the instances in this list may have changed. So do in start
		// to ensure that these list of instances remain the same
		// while the update is non-terminal.

		prevJobConfig, _, err := goalStateDriver.jobConfigOps.Get(
			ctx,
			jobID,
			cachedWorkflow.GetState().JobVersion,
		)
		if err != nil {
			goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
			return err
		}

		instancesAdded, instancesUpdated, instancesRemoved, _, err := cached.GetInstancesToProcessForUpdate(
			ctx,
			cachedJob.ID(),
			prevJobConfig,
			jobConfig,
			goalStateDriver.taskStore,
			goalStateDriver.taskConfigV2Ops,
		)
		if err != nil {
			goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
			return err
		}

		if err := cachedWorkflow.Modify(
			ctx,
			instancesAdded,
			instancesUpdated,
			instancesRemoved,
		); err != nil {
			goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
			return err
		}
	}

	// update the configuration and desired configuration version of
	// all instances which do not need to be updated
	if err = handleUnchangedInstancesInUpdate(
		ctx, cachedWorkflow, cachedJob, jobConfig); err != nil {
		goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
		return err
	}

	// update the state of the job update
	if err = cachedJob.WriteWorkflowProgress(
		ctx,
		updateEnt.id,
		update.State_ROLLING_FORWARD,
		[]uint32{},
		[]uint32{},
		[]uint32{},
	); err != nil {
		goalStateDriver.mtx.updateMetrics.UpdateStartFail.Inc(1)
		return err
	}

	log.WithFields(log.Fields{
		"update_id":         updateEnt.id.GetValue(),
		"job_id":            cachedJob.ID().GetValue(),
		"update_type":       cachedWorkflow.GetWorkflowType().String(),
		"instances_added":   len(cachedWorkflow.GetInstancesAdded()),
		"instances_removed": len(cachedWorkflow.GetInstancesRemoved()),
		"instances_updated": len(cachedWorkflow.GetInstancesUpdated()),
	}).Info("update starting")

	goalStateDriver.EnqueueUpdate(jobID, updateEnt.id, time.Now())
	goalStateDriver.mtx.updateMetrics.UpdateStart.Inc(1)
	return nil
}