in pkg/jobmgr/jobsvc/handler.go [119:251]
func (h *serviceHandler) Create(
ctx context.Context,
req *job.CreateRequest) (resp *job.CreateResponse, err error) {
defer func() {
jobID := req.GetId().GetValue()
instanceCount := req.GetConfig().GetInstanceCount()
headers := yarpcutil.GetHeaders(ctx)
if err != nil || resp.GetError() != nil {
entry := log.WithField("job_id", jobID).
WithField("instance_count", instanceCount).
WithField("headers", headers)
if err != nil {
entry = entry.WithError(err)
}
if resp.GetError() != nil {
entry = entry.WithField("create_error", resp.GetError().String())
}
entry.Warn("JobManager.CreateJob failed")
return
}
log.WithField("job_id", jobID).
WithField("response", resp).
WithField("instance_count", instanceCount).
WithField("headers", headers).
Info("JobSVC.CreateJob succeeded")
}()
h.metrics.JobAPICreate.Inc(1)
if !h.candidate.IsLeader() {
h.metrics.JobCreateFail.Inc(1)
return nil, yarpcerrors.UnavailableErrorf(
"Job Create API not suppported on non-leader")
}
jobID := req.GetId()
// It is possible that jobId is nil since protobuf doesn't enforce it
if jobID == nil || len(jobID.GetValue()) == 0 {
jobID = &peloton.JobID{Value: uuid.New()}
}
if uuid.Parse(jobID.GetValue()) == nil {
log.WithField("job_id", jobID.GetValue()).Warn("JobID is not valid UUID")
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{
Error: &job.CreateResponse_Error{
InvalidJobId: &job.InvalidJobId{
Id: jobID,
Message: "JobID must be valid UUID",
},
},
}, nil
}
jobConfig := req.GetConfig()
respoolPath, err := h.validateResourcePool(jobConfig.GetRespoolID())
if err != nil {
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{
Error: &job.CreateResponse_Error{
InvalidConfig: &job.InvalidJobConfig{
Id: jobID,
Message: err.Error(),
},
},
}, nil
}
// Validate job config with default task configs
err = jobconfig.ValidateConfig(jobConfig, h.jobSvcCfg.MaxTasksPerJob)
if err != nil {
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{
Error: &job.CreateResponse_Error{
InvalidConfig: &job.InvalidJobConfig{
Id: jobID,
Message: err.Error(),
},
},
}, nil
}
// check secrets and config for input sanity
if err = h.validateSecretsAndConfig(
jobConfig, req.GetSecrets()); err != nil {
return &job.CreateResponse{}, err
}
// create secrets in the DB and add them as secret volumes to defaultconfig
err = h.handleCreateSecrets(ctx, jobID, jobConfig, req.GetSecrets())
if err != nil {
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{}, err
}
// Create job in cache and db
cachedJob := h.jobFactory.AddJob(jobID)
systemLabels := jobutil.ConstructSystemLabels(jobConfig, respoolPath.GetValue())
configAddOn := &models.ConfigAddOn{
SystemLabels: systemLabels,
}
err = cachedJob.Create(ctx, jobConfig, configAddOn, nil)
// if err is not nil, still enqueue to goal state engine,
// because job may be partially created. Goal state engine
// knows if the job can be recovered
h.goalStateDriver.EnqueueJob(jobID, time.Now())
if err != nil {
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{
Error: &job.CreateResponse_Error{
AlreadyExists: &job.JobAlreadyExists{
Id: req.Id,
Message: err.Error(),
},
},
JobId: jobID, // should return the jobID even when error occurs
// because the job may be running
}, nil
}
h.metrics.JobCreate.Inc(1)
return &job.CreateResponse{
JobId: jobID,
}, nil
}