in pkg/jobmgr/jobsvc/stateless/handler.go [128:258]
func (h *serviceHandler) CreateJob(
ctx context.Context,
req *svc.CreateJobRequest,
) (resp *svc.CreateJobResponse, err error) {
defer func() {
jobID := req.GetJobId().GetValue()
specVersion := req.GetSpec().GetRevision().GetVersion()
instanceCount := req.GetSpec().GetInstanceCount()
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("job_id", jobID).
WithField("spec_version", specVersion).
WithField("instance_count", instanceCount).
WithField("headers", headers).
WithError(err).
Warn("JobSVC.CreateJob failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("job_id", jobID).
WithField("spec_version", specVersion).
WithField("response", resp).
WithField("instance_count", instanceCount).
WithField("headers", headers).
Info("JobSVC.CreateJob succeeded")
}()
if !h.candidate.IsLeader() {
return nil,
yarpcerrors.UnavailableErrorf("JobSVC.CreateJob is not supported on non-leader")
}
pelotonJobID := &peloton.JobID{Value: req.GetJobId().GetValue()}
// It is possible that jobId is nil since protobuf doesn't enforce it
if len(pelotonJobID.GetValue()) == 0 {
pelotonJobID = &peloton.JobID{Value: uuid.New()}
}
if uuid.Parse(pelotonJobID.GetValue()) == nil {
return nil, yarpcerrors.InvalidArgumentErrorf("jobID is not valid UUID")
}
jobSpec := req.GetSpec()
respoolPath, err := h.validateResourcePoolForJobCreation(ctx, jobSpec.GetRespoolId())
if err != nil {
return nil, errors.Wrap(err, "failed to validate resource pool")
}
jobSpec, err = handlerutil.ConvertForThermosExecutor(
jobSpec,
h.jobSvcCfg.ThermosExecutor,
)
if err != nil {
return nil, errors.Wrap(err, "failed to convert for thermos executor")
}
jobConfig, err := api.ConvertJobSpecToJobConfig(jobSpec)
if err != nil {
return nil, errors.Wrap(err, "failed to convert job spec")
}
// Validate job config with default task configs
err = jobconfig.ValidateConfig(
jobConfig,
h.jobSvcCfg.MaxTasksPerJob,
)
if err != nil {
return nil, errors.Wrap(err, "invalid job spec")
}
// check secrets and config for input sanity
if err = h.validateSecretsAndConfig(jobSpec, req.GetSecrets()); err != nil {
return nil, errors.Wrap(err, "input cannot contain secret volume")
}
// create secrets in the DB and add them as secret volumes to defaultconfig
err = h.handleCreateSecrets(ctx, pelotonJobID.GetValue(), jobSpec, req.GetSecrets())
if err != nil {
return nil, errors.Wrap(err, "failed to handle create-secrets")
}
// Create job in cache and db
cachedJob := h.jobFactory.AddJob(pelotonJobID)
systemLabels := jobutil.ConstructSystemLabels(jobConfig, respoolPath.GetValue())
configAddOn := &models.ConfigAddOn{
SystemLabels: systemLabels,
}
var opaqueData *peloton.OpaqueData
if len(req.GetOpaqueData().GetData()) != 0 {
opaqueData = &peloton.OpaqueData{
Data: req.GetOpaqueData().GetData(),
}
}
err = cachedJob.RollingCreate(
ctx,
jobConfig,
configAddOn,
jobSpec,
api.ConvertCreateSpecToUpdateConfig(req.GetCreateSpec()),
opaqueData,
)
// enqueue the job into goal state engine even in failure case.
// Because the state may be updated, let goal state engine decide what to do
h.goalStateDriver.EnqueueJob(pelotonJobID, time.Now())
if err != nil {
return nil, errors.Wrap(err, "failed to create job in db")
}
runtimeInfo, err := cachedJob.GetRuntime(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get job runtime from cache")
}
return &svc.CreateJobResponse{
JobId: &v1alphapeloton.JobID{Value: pelotonJobID.GetValue()},
Version: versionutil.GetJobEntityVersion(
runtimeInfo.GetConfigurationVersion(),
runtimeInfo.GetDesiredStateVersion(),
runtimeInfo.GetWorkflowVersion(),
),
}, nil
}