func()

in pkg/jobmgr/cached/job.go [778:922]


func (j *job) RollingCreate(
	ctx context.Context,
	config *pbjob.JobConfig,
	configAddOn *models.ConfigAddOn,
	spec *stateless.JobSpec,
	updateConfig *pbupdate.UpdateConfig,
	opaqueData *peloton.OpaqueData,
) error {
	var (
		jobTypeCopy     pbjob.JobType
		jobSummaryCopy  *pbjob.JobSummary
		updateModelCopy *models.UpdateModel
	)
	// notify listeners after dropping the lock
	defer func() {
		j.jobFactory.notifyJobSummaryChanged(
			j.ID(),
			jobTypeCopy,
			jobSummaryCopy,
			updateModelCopy,
		)
	}()

	j.Lock()
	defer j.Unlock()

	if config == nil {
		return yarpcerrors.InvalidArgumentErrorf("missing config in jobInfo")
	}

	if updateConfig.GetRollbackOnFailure() == true {
		return yarpcerrors.InvalidArgumentErrorf("job creation cannot rollback on failure")
	}

	// Add jobID to active jobs table before creating job runtime. This should
	// happen every time a job is first created.
	if err := j.jobFactory.activeJobsOps.Create(
		ctx, j.ID()); err != nil {
		j.invalidateCache()
		return err
	}

	config = populateConfigChangeLog(config)

	// dummy config is used as the starting config for update workflow
	dummyConfig := proto.Clone(config).(*pbjob.JobConfig)
	dummyConfig.InstanceCount = 0
	dummyConfig.ChangeLog.Version = jobmgrcommon.DummyConfigVersion
	dummyConfig.DefaultConfig = nil
	dummyConfig.InstanceConfig = nil

	instancesAdded := make([]uint32, config.InstanceCount)
	for i := uint32(0); i < config.InstanceCount; i++ {
		instancesAdded[i] = i
	}

	// create workflow which is going to initialize the job
	updateID := &peloton.UpdateID{Value: uuid.New()}

	// create job runtime and set state to UNINITIALIZED with updateID,
	// so on error recovery, update config such as batch size can be
	// recovered
	if err := j.createJobRuntime(ctx, config, updateID); err != nil {
		j.invalidateCache()
		return err
	}

	// create job name to job id mapping.
	// if the creation fails here, since job config is not created yet,
	// the job will be cleaned up in goalstate engine JobRecover action.
	if config.GetType() == pbjob.JobType_SERVICE {
		if err := j.jobFactory.jobNameToIDOps.Create(
			ctx,
			config.GetName(),
			j.ID(),
		); err != nil {
			j.invalidateCache()
			return err
		}
	}

	newWorkflow := newUpdate(updateID, j.jobFactory)
	if err := newWorkflow.Create(
		ctx,
		j.id,
		config,
		dummyConfig,
		configAddOn,
		instancesAdded,
		nil,
		nil,
		models.WorkflowType_UPDATE,
		updateConfig,
		opaqueData,
	); err != nil {
		j.invalidateCache()
		return err
	}

	// create the dummy config in db, it is possible that the dummy config already
	// exists in db when doing error retry. So ignore already exist error here
	if err := j.createJobConfig(ctx, dummyConfig, configAddOn, nil); err != nil &&
		!yarpcerrors.IsAlreadyExists(errors.Cause(err)) {
		j.invalidateCache()
		return err
	}

	// create the real config as the target config for update workflow.
	// Once the config is persisted successfully in db, the job is considered
	// as created successfully, and should be able to recover from
	// rest of the error. Calling RollingCreate after this call succeeds again,
	// would result in AlreadyExist error
	if err := j.createJobConfig(ctx, config, configAddOn, spec); err != nil {
		j.invalidateCache()
		return err
	}
	jobTypeCopy = j.jobType

	// both config and runtime are created, move the state to PENDING
	j.runtime.State = pbjob.JobState_PENDING
	if err := j.jobFactory.jobRuntimeOps.Upsert(
		ctx,
		j.id,
		j.runtime); err != nil {
		j.invalidateCache()
		return err
	}

	if err := j.jobFactory.jobIndexOps.Create(
		ctx,
		j.id,
		config,
		j.runtime,
	); err != nil {
		j.invalidateCache()
		return err
	}

	j.workflows[updateID.GetValue()] = newWorkflow

	// create JobSummary and WorkflowStatus while we have the lock
	jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, updateID)

	return nil
}