func()

in pkg/jobmgr/jobsvc/stateless/handler.go [128:258]


func (h *serviceHandler) CreateJob(
	ctx context.Context,
	req *svc.CreateJobRequest,
) (resp *svc.CreateJobResponse, err error) {
	defer func() {
		jobID := req.GetJobId().GetValue()
		specVersion := req.GetSpec().GetRevision().GetVersion()
		instanceCount := req.GetSpec().GetInstanceCount()
		headers := yarpcutil.GetHeaders(ctx)

		if err != nil {
			log.WithField("job_id", jobID).
				WithField("spec_version", specVersion).
				WithField("instance_count", instanceCount).
				WithField("headers", headers).
				WithError(err).
				Warn("JobSVC.CreateJob failed")
			err = yarpcutil.ConvertToYARPCError(err)
			return
		}

		log.WithField("job_id", jobID).
			WithField("spec_version", specVersion).
			WithField("response", resp).
			WithField("instance_count", instanceCount).
			WithField("headers", headers).
			Info("JobSVC.CreateJob succeeded")
	}()

	if !h.candidate.IsLeader() {
		return nil,
			yarpcerrors.UnavailableErrorf("JobSVC.CreateJob is not supported on non-leader")
	}

	pelotonJobID := &peloton.JobID{Value: req.GetJobId().GetValue()}

	// It is possible that jobId is nil since protobuf doesn't enforce it
	if len(pelotonJobID.GetValue()) == 0 {
		pelotonJobID = &peloton.JobID{Value: uuid.New()}
	}

	if uuid.Parse(pelotonJobID.GetValue()) == nil {
		return nil, yarpcerrors.InvalidArgumentErrorf("jobID is not valid UUID")
	}

	jobSpec := req.GetSpec()

	respoolPath, err := h.validateResourcePoolForJobCreation(ctx, jobSpec.GetRespoolId())
	if err != nil {
		return nil, errors.Wrap(err, "failed to validate resource pool")
	}

	jobSpec, err = handlerutil.ConvertForThermosExecutor(
		jobSpec,
		h.jobSvcCfg.ThermosExecutor,
	)
	if err != nil {
		return nil, errors.Wrap(err, "failed to convert for thermos executor")
	}

	jobConfig, err := api.ConvertJobSpecToJobConfig(jobSpec)
	if err != nil {
		return nil, errors.Wrap(err, "failed to convert job spec")
	}

	// Validate job config with default task configs
	err = jobconfig.ValidateConfig(
		jobConfig,
		h.jobSvcCfg.MaxTasksPerJob,
	)
	if err != nil {
		return nil, errors.Wrap(err, "invalid job spec")
	}

	// check secrets and config for input sanity
	if err = h.validateSecretsAndConfig(jobSpec, req.GetSecrets()); err != nil {
		return nil, errors.Wrap(err, "input cannot contain secret volume")
	}

	// create secrets in the DB and add them as secret volumes to defaultconfig
	err = h.handleCreateSecrets(ctx, pelotonJobID.GetValue(), jobSpec, req.GetSecrets())
	if err != nil {
		return nil, errors.Wrap(err, "failed to handle create-secrets")
	}

	// Create job in cache and db
	cachedJob := h.jobFactory.AddJob(pelotonJobID)

	systemLabels := jobutil.ConstructSystemLabels(jobConfig, respoolPath.GetValue())
	configAddOn := &models.ConfigAddOn{
		SystemLabels: systemLabels,
	}

	var opaqueData *peloton.OpaqueData
	if len(req.GetOpaqueData().GetData()) != 0 {
		opaqueData = &peloton.OpaqueData{
			Data: req.GetOpaqueData().GetData(),
		}
	}

	err = cachedJob.RollingCreate(
		ctx,
		jobConfig,
		configAddOn,
		jobSpec,
		api.ConvertCreateSpecToUpdateConfig(req.GetCreateSpec()),
		opaqueData,
	)

	// enqueue the job into goal state engine even in failure case.
	// Because the state may be updated, let goal state engine decide what to do
	h.goalStateDriver.EnqueueJob(pelotonJobID, time.Now())

	if err != nil {
		return nil, errors.Wrap(err, "failed to create job in db")
	}

	runtimeInfo, err := cachedJob.GetRuntime(ctx)
	if err != nil {
		return nil, errors.Wrap(err, "failed to get job runtime from cache")
	}

	return &svc.CreateJobResponse{
		JobId: &v1alphapeloton.JobID{Value: pelotonJobID.GetValue()},
		Version: versionutil.GetJobEntityVersion(
			runtimeInfo.GetConfigurationVersion(),
			runtimeInfo.GetDesiredStateVersion(),
			runtimeInfo.GetWorkflowVersion(),
		),
	}, nil
}