func()

in pkg/jobmgr/jobsvc/handler.go [119:251]


func (h *serviceHandler) Create(
	ctx context.Context,
	req *job.CreateRequest) (resp *job.CreateResponse, err error) {
	defer func() {
		jobID := req.GetId().GetValue()
		instanceCount := req.GetConfig().GetInstanceCount()
		headers := yarpcutil.GetHeaders(ctx)

		if err != nil || resp.GetError() != nil {
			entry := log.WithField("job_id", jobID).
				WithField("instance_count", instanceCount).
				WithField("headers", headers)

			if err != nil {
				entry = entry.WithError(err)
			}

			if resp.GetError() != nil {
				entry = entry.WithField("create_error", resp.GetError().String())
			}

			entry.Warn("JobManager.CreateJob failed")
			return
		}

		log.WithField("job_id", jobID).
			WithField("response", resp).
			WithField("instance_count", instanceCount).
			WithField("headers", headers).
			Info("JobSVC.CreateJob succeeded")
	}()

	h.metrics.JobAPICreate.Inc(1)

	if !h.candidate.IsLeader() {
		h.metrics.JobCreateFail.Inc(1)
		return nil, yarpcerrors.UnavailableErrorf(
			"Job Create API not suppported on non-leader")
	}

	jobID := req.GetId()
	// It is possible that jobId is nil since protobuf doesn't enforce it
	if jobID == nil || len(jobID.GetValue()) == 0 {
		jobID = &peloton.JobID{Value: uuid.New()}
	}

	if uuid.Parse(jobID.GetValue()) == nil {
		log.WithField("job_id", jobID.GetValue()).Warn("JobID is not valid UUID")
		h.metrics.JobCreateFail.Inc(1)
		return &job.CreateResponse{
			Error: &job.CreateResponse_Error{
				InvalidJobId: &job.InvalidJobId{
					Id:      jobID,
					Message: "JobID must be valid UUID",
				},
			},
		}, nil
	}

	jobConfig := req.GetConfig()

	respoolPath, err := h.validateResourcePool(jobConfig.GetRespoolID())
	if err != nil {
		h.metrics.JobCreateFail.Inc(1)
		return &job.CreateResponse{
			Error: &job.CreateResponse_Error{
				InvalidConfig: &job.InvalidJobConfig{
					Id:      jobID,
					Message: err.Error(),
				},
			},
		}, nil
	}

	// Validate job config with default task configs
	err = jobconfig.ValidateConfig(jobConfig, h.jobSvcCfg.MaxTasksPerJob)
	if err != nil {
		h.metrics.JobCreateFail.Inc(1)
		return &job.CreateResponse{
			Error: &job.CreateResponse_Error{
				InvalidConfig: &job.InvalidJobConfig{
					Id:      jobID,
					Message: err.Error(),
				},
			},
		}, nil
	}

	// check secrets and config for input sanity
	if err = h.validateSecretsAndConfig(
		jobConfig, req.GetSecrets()); err != nil {
		return &job.CreateResponse{}, err
	}

	// create secrets in the DB and add them as secret volumes to defaultconfig
	err = h.handleCreateSecrets(ctx, jobID, jobConfig, req.GetSecrets())
	if err != nil {
		h.metrics.JobCreateFail.Inc(1)
		return &job.CreateResponse{}, err
	}

	// Create job in cache and db
	cachedJob := h.jobFactory.AddJob(jobID)

	systemLabels := jobutil.ConstructSystemLabels(jobConfig, respoolPath.GetValue())
	configAddOn := &models.ConfigAddOn{
		SystemLabels: systemLabels,
	}
	err = cachedJob.Create(ctx, jobConfig, configAddOn, nil)
	// if err is not nil, still enqueue to goal state engine,
	// because job may be partially created. Goal state engine
	// knows if the job can be recovered
	h.goalStateDriver.EnqueueJob(jobID, time.Now())

	if err != nil {
		h.metrics.JobCreateFail.Inc(1)
		return &job.CreateResponse{
			Error: &job.CreateResponse_Error{
				AlreadyExists: &job.JobAlreadyExists{
					Id:      req.Id,
					Message: err.Error(),
				},
			},
			JobId: jobID, // should return the jobID even when error occurs
			// because the job may be running
		}, nil
	}
	h.metrics.JobCreate.Inc(1)

	return &job.CreateResponse{
		JobId: jobID,
	}, nil
}