pkg/jobmgr/jobsvc/handler.go (1,022 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jobsvc
import (
"context"
"encoding/base64"
"fmt"
"time"
mesos "github.com/uber/peloton/.gen/mesos/v1"
apierrors "github.com/uber/peloton/.gen/peloton/api/v0/errors"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/query"
"github.com/uber/peloton/.gen/peloton/api/v0/respool"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
pbupdate "github.com/uber/peloton/.gen/peloton/api/v0/update"
"github.com/uber/peloton/.gen/peloton/private/models"
"github.com/uber/peloton/.gen/peloton/private/resmgrsvc"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/leader"
"github.com/uber/peloton/pkg/common/util"
versionutil "github.com/uber/peloton/pkg/common/util/entityversion"
yarpcutil "github.com/uber/peloton/pkg/common/util/yarpc"
"github.com/uber/peloton/pkg/jobmgr/cached"
"github.com/uber/peloton/pkg/jobmgr/goalstate"
"github.com/uber/peloton/pkg/jobmgr/job/config"
jobmgrtask "github.com/uber/peloton/pkg/jobmgr/task"
"github.com/uber/peloton/pkg/jobmgr/util/handler"
jobutil "github.com/uber/peloton/pkg/jobmgr/util/job"
"github.com/uber/peloton/pkg/storage"
ormobjects "github.com/uber/peloton/pkg/storage/objects"
"github.com/pborman/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
"go.uber.org/yarpc"
"go.uber.org/yarpc/yarpcerrors"
)
var (
errNullResourcePoolID = errors.New("resource pool ID is null")
errResourcePoolNotFound = errors.New("resource pool not found")
errRootResourcePoolID = errors.New("cannot submit jobs to the `root` resource pool")
errNonLeafResourcePool = errors.New("cannot submit jobs to a non leaf " +
"resource pool")
)
// InitServiceHandler initializes the job manager
func InitServiceHandler(
d *yarpc.Dispatcher,
parent tally.Scope,
jobStore storage.JobStore,
taskStore storage.TaskStore,
ormStore *ormobjects.Store,
jobFactory cached.JobFactory,
goalStateDriver goalstate.Driver,
candidate leader.Candidate,
clientName string,
jobSvcCfg Config) {
jobSvcCfg.normalize()
handler := &serviceHandler{
jobStore: jobStore,
taskStore: taskStore,
jobIndexOps: ormobjects.NewJobIndexOps(ormStore),
jobConfigOps: ormobjects.NewJobConfigOps(ormStore),
jobRuntimeOps: ormobjects.NewJobRuntimeOps(ormStore),
secretInfoOps: ormobjects.NewSecretInfoOps(ormStore),
respoolClient: respool.NewResourceManagerYARPCClient(d.ClientConfig(clientName)),
resmgrClient: resmgrsvc.NewResourceManagerServiceYARPCClient(d.ClientConfig(clientName)),
rootCtx: context.Background(),
jobFactory: jobFactory,
goalStateDriver: goalStateDriver,
candidate: candidate,
metrics: NewMetrics(parent.SubScope("jobmgr").SubScope("job")),
jobSvcCfg: jobSvcCfg,
}
d.Register(job.BuildJobManagerYARPCProcedures(handler))
}
// serviceHandler implements peloton.api.job.JobManager
type serviceHandler struct {
jobStore storage.JobStore
taskStore storage.TaskStore
activeJobsOps ormobjects.ActiveJobsOps
jobIndexOps ormobjects.JobIndexOps
jobConfigOps ormobjects.JobConfigOps
jobRuntimeOps ormobjects.JobRuntimeOps
secretInfoOps ormobjects.SecretInfoOps
respoolClient respool.ResourceManagerYARPCClient
resmgrClient resmgrsvc.ResourceManagerServiceYARPCClient
rootCtx context.Context
jobFactory cached.JobFactory
goalStateDriver goalstate.Driver
candidate leader.Candidate
metrics *Metrics
jobSvcCfg Config
}
// Create creates a job object for a given job configuration and
// enqueues the tasks for scheduling
func (h *serviceHandler) Create(
ctx context.Context,
req *job.CreateRequest) (resp *job.CreateResponse, err error) {
defer func() {
jobID := req.GetId().GetValue()
instanceCount := req.GetConfig().GetInstanceCount()
headers := yarpcutil.GetHeaders(ctx)
if err != nil || resp.GetError() != nil {
entry := log.WithField("job_id", jobID).
WithField("instance_count", instanceCount).
WithField("headers", headers)
if err != nil {
entry = entry.WithError(err)
}
if resp.GetError() != nil {
entry = entry.WithField("create_error", resp.GetError().String())
}
entry.Warn("JobManager.CreateJob failed")
return
}
log.WithField("job_id", jobID).
WithField("response", resp).
WithField("instance_count", instanceCount).
WithField("headers", headers).
Info("JobSVC.CreateJob succeeded")
}()
h.metrics.JobAPICreate.Inc(1)
if !h.candidate.IsLeader() {
h.metrics.JobCreateFail.Inc(1)
return nil, yarpcerrors.UnavailableErrorf(
"Job Create API not suppported on non-leader")
}
jobID := req.GetId()
// It is possible that jobId is nil since protobuf doesn't enforce it
if jobID == nil || len(jobID.GetValue()) == 0 {
jobID = &peloton.JobID{Value: uuid.New()}
}
if uuid.Parse(jobID.GetValue()) == nil {
log.WithField("job_id", jobID.GetValue()).Warn("JobID is not valid UUID")
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{
Error: &job.CreateResponse_Error{
InvalidJobId: &job.InvalidJobId{
Id: jobID,
Message: "JobID must be valid UUID",
},
},
}, nil
}
jobConfig := req.GetConfig()
respoolPath, err := h.validateResourcePool(jobConfig.GetRespoolID())
if err != nil {
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{
Error: &job.CreateResponse_Error{
InvalidConfig: &job.InvalidJobConfig{
Id: jobID,
Message: err.Error(),
},
},
}, nil
}
// Validate job config with default task configs
err = jobconfig.ValidateConfig(jobConfig, h.jobSvcCfg.MaxTasksPerJob)
if err != nil {
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{
Error: &job.CreateResponse_Error{
InvalidConfig: &job.InvalidJobConfig{
Id: jobID,
Message: err.Error(),
},
},
}, nil
}
// check secrets and config for input sanity
if err = h.validateSecretsAndConfig(
jobConfig, req.GetSecrets()); err != nil {
return &job.CreateResponse{}, err
}
// create secrets in the DB and add them as secret volumes to defaultconfig
err = h.handleCreateSecrets(ctx, jobID, jobConfig, req.GetSecrets())
if err != nil {
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{}, err
}
// Create job in cache and db
cachedJob := h.jobFactory.AddJob(jobID)
systemLabels := jobutil.ConstructSystemLabels(jobConfig, respoolPath.GetValue())
configAddOn := &models.ConfigAddOn{
SystemLabels: systemLabels,
}
err = cachedJob.Create(ctx, jobConfig, configAddOn, nil)
// if err is not nil, still enqueue to goal state engine,
// because job may be partially created. Goal state engine
// knows if the job can be recovered
h.goalStateDriver.EnqueueJob(jobID, time.Now())
if err != nil {
h.metrics.JobCreateFail.Inc(1)
return &job.CreateResponse{
Error: &job.CreateResponse_Error{
AlreadyExists: &job.JobAlreadyExists{
Id: req.Id,
Message: err.Error(),
},
},
JobId: jobID, // should return the jobID even when error occurs
// because the job may be running
}, nil
}
h.metrics.JobCreate.Inc(1)
return &job.CreateResponse{
JobId: jobID,
}, nil
}
// Update updates a job object for a given job configuration and
// performs the appropriate action based on the change
func (h *serviceHandler) Update(
ctx context.Context,
req *job.UpdateRequest) (resp *job.UpdateResponse, err error) {
defer func() {
jobID := req.GetId().GetValue()
headers := yarpcutil.GetHeaders(ctx)
configVersion := req.GetConfig().GetChangeLog().GetVersion()
if err != nil || resp.GetError() != nil {
entry := log.WithField("job_id", jobID).
WithField("headers", headers).
WithField("config_version", configVersion)
if err != nil {
entry = entry.WithError(err)
}
if resp.GetError() != nil {
entry = entry.WithField("update_error", resp.GetError().String())
}
entry.Warn("JobManager.Update failed")
return
}
log.WithField("job_id", jobID).
WithField("response", resp).
WithField("config_version", configVersion).
WithField("headers", headers).
Info("JobManager.Update succeeded")
}()
h.metrics.JobAPIUpdate.Inc(1)
if !h.candidate.IsLeader() {
h.metrics.JobUpdateFail.Inc(1)
return nil, yarpcerrors.UnavailableErrorf(
"Job Update API not suppported on non-leader")
}
jobID := req.GetId()
cachedJob := h.jobFactory.AddJob(jobID)
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", jobID.GetValue()).
Error("Failed to get runtime")
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
if util.IsPelotonJobStateTerminal(jobRuntime.State) {
msg := fmt.Sprintf("Job is in a terminal state:%s", jobRuntime.State)
h.metrics.JobUpdateFail.Inc(1)
return nil, yarpcerrors.InvalidArgumentErrorf(msg)
}
newConfig := req.GetConfig()
oldConfig, oldConfigAddOn, err := h.jobConfigOps.Get(
ctx,
jobID,
jobRuntime.GetConfigurationVersion())
if err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
if err != nil {
log.WithError(err).
WithField("job_id", jobID.GetValue()).
Error("Failed to GetJobConfig")
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
if oldConfig.GetType() != job.JobType_BATCH {
return nil, yarpcerrors.InvalidArgumentErrorf(
"job update is only supported for batch jobs")
}
if newConfig.GetRespoolID() == nil {
newConfig.RespoolID = oldConfig.GetRespoolID()
}
// Remove the existing secret volumes from the config. These were added by
// peloton at the time of secret creation. We will add them to new config
// after validating the new config at the time of handling secrets. If we
// keep these volumes in oldConfig, ValidateUpdatedConfig will fail.
existingSecretVolumes := util.RemoveSecretVolumesFromJobConfig(oldConfig)
// check secrets and new config for input sanity
if err := h.validateSecretsAndConfig(newConfig, req.GetSecrets()); err != nil {
return nil, err
}
err = jobconfig.ValidateUpdatedConfig(oldConfig, newConfig, h.jobSvcCfg.MaxTasksPerJob)
if err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, yarpcerrors.InvalidArgumentErrorf(err.Error())
}
if err = h.handleUpdateSecrets(ctx, jobID, existingSecretVolumes, newConfig,
req.GetSecrets()); err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
instancesToAdd := newConfig.GetInstanceCount() -
oldConfig.GetInstanceCount()
// You could just update secrets of a job without changing instance count.
// In that case, do not treat this Update as NOOP.
if instancesToAdd <= 0 && len(req.GetSecrets()) == 0 {
log.WithField("job_id", jobID.GetValue()).
Info("update is a noop")
return nil, nil
}
var respoolPath string
for _, label := range oldConfigAddOn.GetSystemLabels() {
if label.GetKey() == common.SystemLabelResourcePool {
respoolPath = label.GetValue()
}
}
newConfigAddOn := &models.ConfigAddOn{
SystemLabels: jobutil.ConstructSystemLabels(newConfig, respoolPath),
}
// first persist the configuration
newUpdatedConfig, err := cachedJob.CompareAndSetConfig(
ctx,
mergeInstanceConfig(oldConfig, newConfig),
newConfigAddOn,
nil)
if err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
// next persist the runtime state and the new configuration version
err = cachedJob.Update(ctx, &job.JobInfo{
Runtime: &job.RuntimeInfo{
ConfigurationVersion: newUpdatedConfig.GetChangeLog().GetVersion(),
State: job.JobState_INITIALIZED,
},
}, nil,
nil,
cached.UpdateCacheAndDB)
if err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
h.goalStateDriver.EnqueueJob(jobID, time.Now())
h.metrics.JobUpdate.Inc(1)
msg := fmt.Sprintf("added %d instances", instancesToAdd)
return &job.UpdateResponse{
Id: jobID,
Message: msg,
}, nil
}
// Get returns a job config for a given job ID
func (h *serviceHandler) Get(
ctx context.Context,
req *job.GetRequest) (resp *job.GetResponse, err error) {
defer func() {
jobID := req.GetId().GetValue()
headers := yarpcutil.GetHeaders(ctx)
if err != nil || resp.GetError() != nil {
entry := log.WithField("job_id", jobID).
WithField("headers", headers)
if err != nil {
entry = entry.WithError(err)
}
if resp.GetError() != nil {
entry = entry.WithField("get_error", resp.GetError().String())
}
entry.Warn("JobManager.Get failed")
return
}
log.WithField("job_id", jobID).
WithField("response", resp).
WithField("headers", headers).
Debug("JobManager.Get succeeded")
}()
h.metrics.JobAPIGet.Inc(1)
jobRuntime, err := handler.GetJobRuntimeWithoutFillingCache(
ctx,
req.Id,
h.jobFactory,
h.jobRuntimeOps,
)
if err != nil {
h.metrics.JobGetFail.Inc(1)
log.WithError(err).
WithField("job_id", req.Id.Value).
Debug("failed to get runtime")
return &job.GetResponse{
Error: &job.GetResponse_Error{
GetRuntimeFail: &apierrors.JobGetRuntimeFail{
Id: req.Id,
Message: err.Error(),
},
},
}, nil
}
jobConfig, _, err := h.jobConfigOps.Get(
ctx,
req.GetId(),
jobRuntime.GetConfigurationVersion())
if err != nil {
h.metrics.JobGetFail.Inc(1)
log.WithError(err).
WithField("job_id", req.Id.Value).
Debug("GetJobConfig failed")
return &job.GetResponse{
Error: &job.GetResponse_Error{
NotFound: &apierrors.JobNotFound{
Id: req.Id,
Message: err.Error(),
},
},
}, nil
}
// Do not display the secret volumes in defaultconfig that were added by
// handleSecrets. They should remain internal to peloton logic.
// Secret ID and Path should be returned using the peloton.Secret
// proto message.
secretVolumes := util.RemoveSecretVolumesFromJobConfig(jobConfig)
h.metrics.JobGet.Inc(1)
resp = &job.GetResponse{
JobInfo: &job.JobInfo{
Id: req.GetId(),
Config: jobConfig,
Runtime: jobRuntime,
},
Secrets: jobmgrtask.CreateSecretsFromVolumes(secretVolumes),
}
return resp, nil
}
// Refresh loads the task runtime state from DB, updates the cache,
// and enqueues it to goal state for evaluation.
func (h *serviceHandler) Refresh(ctx context.Context, req *job.RefreshRequest) (resp *job.RefreshResponse, err error) {
defer func() {
jobID := req.GetId().GetValue()
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("job_id", jobID).
WithField("headers", headers).
WithError(err).
Warn("JobManager.Refresh failed")
return
}
log.WithField("job_id", jobID).
WithField("headers", headers).
Debug("JobManager.Refresh succeeded")
}()
h.metrics.JobAPIRefresh.Inc(1)
if !h.candidate.IsLeader() {
h.metrics.JobRefreshFail.Inc(1)
return nil, yarpcerrors.UnavailableErrorf("Job Refresh API not suppported on non-leader")
}
jobRuntime, err := h.jobRuntimeOps.Get(ctx, req.GetId())
if err != nil {
log.WithError(err).
WithField("job_id", req.GetId().GetValue()).
Error("failed to get job runtime in refresh job")
h.metrics.JobRefreshFail.Inc(1)
return &job.RefreshResponse{}, yarpcerrors.NotFoundErrorf("job not found")
}
jobConfig, configAddOn, err := h.jobConfigOps.Get(
ctx,
req.GetId(),
jobRuntime.GetConfigurationVersion())
if err != nil {
log.WithError(err).
WithField("job_id", req.GetId().GetValue()).
Error("failed to get job config in refresh job")
h.metrics.JobRefreshFail.Inc(1)
return &job.RefreshResponse{}, yarpcerrors.NotFoundErrorf("job not found")
}
// Update cache and enqueue job into goal state
cachedJob := h.jobFactory.AddJob(req.GetId())
cachedJob.Update(ctx, &job.JobInfo{
Config: jobConfig,
Runtime: jobRuntime,
}, configAddOn,
nil,
cached.UpdateCacheOnly)
h.goalStateDriver.EnqueueJob(req.GetId(), time.Now())
h.metrics.JobRefresh.Inc(1)
return &job.RefreshResponse{}, nil
}
// Query returns a list of jobs matching the given query
// List/Query API should not use cachedJob
// because we would not clean up the cache for untracked job
func (h *serviceHandler) Query(ctx context.Context, req *job.QueryRequest) (resp *job.QueryResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil || resp.GetError() != nil {
entry := log.WithField("headers", headers)
if err != nil {
entry = entry.WithError(err)
}
if resp.GetError() != nil {
entry = entry.WithField("query_error", resp.GetError().String())
}
entry.Warn("JobManager.Query failed")
return
}
log.WithField("headers", headers).
WithField("response", resp).
Debug("JobManager.Query succeeded")
}()
h.metrics.JobAPIQuery.Inc(1)
callStart := time.Now()
jobConfigs, jobSummary, total, err := h.jobStore.QueryJobs(ctx, req.GetRespoolID(), req.GetSpec(), req.GetSummaryOnly())
if err != nil {
h.metrics.JobQueryFail.Inc(1)
return &job.QueryResponse{
Error: &job.QueryResponse_Error{
Err: &apierrors.UnknownError{
Message: err.Error(),
},
},
Spec: req.GetSpec(),
}, nil
}
h.metrics.JobQuery.Inc(1)
resp = &job.QueryResponse{
Records: jobConfigs,
Results: jobSummary,
Pagination: &query.Pagination{
Offset: req.GetSpec().GetPagination().GetOffset(),
Limit: req.GetSpec().GetPagination().GetLimit(),
Total: total,
},
Spec: req.GetSpec(),
}
callDuration := time.Since(callStart)
h.metrics.JobQueryHandlerDuration.Record(callDuration)
return resp, nil
}
// Delete removes jobs metadata from storage for a terminal job
func (h *serviceHandler) Delete(
ctx context.Context,
req *job.DeleteRequest) (resp *job.DeleteResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
jobID := req.GetId().GetValue()
if err != nil || resp.GetError() != nil {
entry := log.WithField("job_id", jobID).
WithField("headers", headers)
if err != nil {
entry = entry.WithError(err)
}
if resp.GetError() != nil {
entry = entry.WithField("delete_error", resp.GetError().String())
}
entry.Warn("JobManager.Delete failed")
return
}
log.WithField("job_id", jobID).
WithField("headers", headers).
Info("JobManager.Delete succeeded")
}()
h.metrics.JobAPIDelete.Inc(1)
jobRuntime, err := handler.GetJobRuntimeWithoutFillingCache(
ctx, req.Id, h.jobFactory, h.jobRuntimeOps)
if err != nil {
log.WithError(err).
WithField("job_id", req.GetId().GetValue()).
Error("Failed to get runtime")
h.metrics.JobDeleteFail.Inc(1)
return nil, yarpcerrors.NotFoundErrorf("job not found")
}
if !util.IsPelotonJobStateTerminal(jobRuntime.State) {
h.metrics.JobDeleteFail.Inc(1)
return nil, yarpcerrors.InternalErrorf(
fmt.Sprintf("Job is not in a terminal state: %s", jobRuntime.State))
}
// Delete job from DB
if err := h.jobStore.DeleteJob(ctx, req.GetId().GetValue()); err != nil {
h.metrics.JobDeleteFail.Inc(1)
log.Errorf("Delete job failed with error %v", err)
return nil, err
}
if err := h.jobIndexOps.Delete(ctx, req.GetId()); err != nil {
h.metrics.JobDeleteFail.Inc(1)
log.WithField("job_id", req.GetId()).
WithError(err).Error("Failed to delete job from job_index")
return nil, err
}
// Delete job from goalstate and cache
cachedJob := h.jobFactory.GetJob(req.GetId())
if cachedJob != nil {
taskMap := cachedJob.GetAllTasks()
for instID := range taskMap {
h.goalStateDriver.DeleteTask(req.GetId(), instID)
}
h.goalStateDriver.DeleteJob(req.GetId())
h.jobFactory.ClearJob(req.GetId())
}
h.metrics.JobDelete.Inc(1)
return &job.DeleteResponse{}, nil
}
func (h *serviceHandler) Restart(
ctx context.Context,
req *job.RestartRequest) (resp *job.RestartResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
jobID := req.GetId().GetValue()
if err != nil {
log.WithField("job_id", jobID).
WithField("headers", headers).
WithError(err).
Warn("JobManager.Restart failed")
return
}
log.WithField("job_id", jobID).
WithField("headers", headers).
Info("JobManager.Restart succeeded")
}()
h.metrics.JobAPIRestart.Inc(1)
updateID, resourceVersion, err := h.createNonUpdateWorkflow(
ctx,
req.GetId(),
req.GetResourceVersion(),
req.GetRanges(),
req.GetRestartConfig().GetBatchSize(),
models.WorkflowType_RESTART,
)
if err != nil {
h.metrics.JobRestartFail.Inc(1)
return nil, err
}
h.metrics.JobRestart.Inc(1)
return &job.RestartResponse{
UpdateID: updateID,
ResourceVersion: resourceVersion,
}, nil
}
func (h *serviceHandler) Start(
ctx context.Context,
req *job.StartRequest) (resp *job.StartResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
jobID := req.GetId().GetValue()
if err != nil {
log.WithField("job_id", jobID).
WithField("headers", headers).
WithError(err).
Warn("JobManager.Start failed")
return
}
log.WithField("job_id", jobID).
WithField("headers", headers).
Info("JobManager.Start succeeded")
}()
h.metrics.JobAPIStart.Inc(1)
updateID, resourceVersion, err := h.createNonUpdateWorkflow(
ctx,
req.GetId(),
req.GetResourceVersion(),
req.GetRanges(),
req.GetStartConfig().GetBatchSize(),
models.WorkflowType_START,
)
if err != nil {
h.metrics.JobStartFail.Inc(1)
return nil, err
}
h.metrics.JobStart.Inc(1)
return &job.StartResponse{
UpdateID: updateID,
ResourceVersion: resourceVersion,
}, nil
}
func (h *serviceHandler) Stop(
ctx context.Context,
req *job.StopRequest) (resp *job.StopResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
jobID := req.GetId().GetValue()
if err != nil {
log.WithField("job_id", jobID).
WithField("headers", headers).
WithError(err).
Warn("JobManager.Stop failed")
return
}
log.WithField("job_id", jobID).
WithField("headers", headers).
Info("JobManager.Stop succeeded")
}()
h.metrics.JobAPIStop.Inc(1)
updateID, resourceVersion, err := h.createNonUpdateWorkflow(
ctx,
req.GetId(),
req.GetResourceVersion(),
req.GetRanges(),
req.GetStopConfig().GetBatchSize(),
models.WorkflowType_STOP,
)
if err != nil {
h.metrics.JobStopFail.Inc(1)
return nil, err
}
h.metrics.JobStop.Inc(1)
return &job.StopResponse{
UpdateID: updateID,
ResourceVersion: resourceVersion,
}, nil
}
// createNonUpdateWorkflow creates a workflow excluding UPDATE
// (i.e RESTART/START/STOP are supported)
// it returns updateID, new resource version upon success
func (h *serviceHandler) createNonUpdateWorkflow(
ctx context.Context,
jobID *peloton.JobID,
resourceVersion uint64,
ranges []*task.InstanceRange,
batchSize uint32,
workflowType models.WorkflowType,
) (*peloton.UpdateID, uint64, error) {
if workflowType == models.WorkflowType_UNKNOWN || workflowType == models.WorkflowType_UPDATE {
return nil, 0,
yarpcerrors.InvalidArgumentErrorf(
"unexpected WorkflowType_%s", workflowType.String())
}
if !h.candidate.IsLeader() {
return nil, 0,
yarpcerrors.UnavailableErrorf(
"Job %s API not suppported on non-leader", workflowType.String())
}
cachedJob := h.jobFactory.AddJob(jobID)
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return nil, 0, err
}
jobConfig, configAddOn, err := h.jobConfigOps.Get(
ctx,
jobID,
runtime.GetConfigurationVersion(),
)
if err != nil {
return nil, 0, err
}
if jobConfig.GetType() != job.JobType_SERVICE {
return nil, 0, yarpcerrors.InvalidArgumentErrorf(
"%s supported only for service jobs", workflowType.String())
}
// copy the config with provided resource version number
newConfig := *jobConfig
now := time.Now()
newConfig.ChangeLog = &peloton.ChangeLog{
Version: resourceVersion,
CreatedAt: uint64(now.UnixNano()),
UpdatedAt: uint64(now.UnixNano()),
}
if ranges == nil {
ranges = []*task.InstanceRange{
{From: 0, To: newConfig.GetInstanceCount()},
}
}
updateID, _, err := cachedJob.CreateWorkflow(
ctx,
workflowType,
&pbupdate.UpdateConfig{
BatchSize: batchSize,
},
versionutil.GetJobEntityVersion(
runtime.GetConfigurationVersion(),
runtime.GetDesiredStateVersion(),
runtime.GetWorkflowVersion()),
cached.WithInstanceToProcess(
nil,
convertRangesToSlice(ranges, newConfig.GetInstanceCount()),
nil),
cached.WithConfig(
&newConfig,
jobConfig,
configAddOn,
nil,
),
)
// In case of error, since it is not clear if job runtime was
// persisted with the update ID or not, enqueue the update to
// the goal state. If the update ID got persisted, update should
// start running, else, it should be aborted. Enqueueing it into
// the goal state will ensure both. In case the update was not
// persisted, clear the cache as well so that it is reloaded
// from DB and cleaned up.
// Add update to goal state engine to start it
if len(updateID.GetValue()) > 0 {
h.goalStateDriver.EnqueueUpdate(jobID, updateID, time.Now())
}
if err != nil {
return nil, 0, err
}
cachedConfig, err := cachedJob.GetConfig(ctx)
if err != nil {
return nil, 0, err
}
return updateID, cachedConfig.GetChangeLog().GetVersion(), err
}
func (h *serviceHandler) GetCache(
ctx context.Context,
req *job.GetCacheRequest) (resp *job.GetCacheResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
jobID := req.GetId().GetValue()
if err != nil {
log.WithField("job_id", jobID).
WithField("headers", headers).
WithError(err).
Warn("JobManager.GetCache failed")
return
}
log.WithField("job_id", jobID).
WithField("headers", headers).
Debug("JobManager.GetCache succeeded")
}()
cachedJob := h.jobFactory.GetJob(req.GetId())
if cachedJob == nil {
return nil,
yarpcerrors.NotFoundErrorf("Job not found in cache")
}
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return nil,
yarpcerrors.InternalErrorf("Cannot get job runtime with error %v", err)
}
config, err := cachedJob.GetConfig(ctx)
if err != nil {
return nil,
yarpcerrors.InternalErrorf("Cannot get job config with error %v", err)
}
return &job.GetCacheResponse{
Runtime: runtime,
Config: &job.JobConfig{
ChangeLog: config.GetChangeLog(),
SLA: config.GetSLA(),
RespoolID: config.GetRespoolID(),
Type: config.GetType(),
InstanceCount: config.GetInstanceCount(),
},
}, nil
}
// GetActiveJobs is a debug only API used to get the list of active job IDs
// stored in Peloton. It will be temporarily used for testing the consistency
// between active_jobs table and mv_job_by_state materialzied view
func (h *serviceHandler) GetActiveJobs(
ctx context.Context,
req *job.GetActiveJobsRequest) (resp *job.GetActiveJobsResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("headers", headers).
WithError(err).
Warn("JobManager.GetActiveJobs failed")
return
}
log.WithField("headers", headers).
Debug("JobManager.GetActiveJobs succeeded")
}()
jobIDs, err := h.activeJobsOps.GetAll(ctx)
if err != nil {
return nil, err
}
return &job.GetActiveJobsResponse{
Ids: jobIDs,
}, nil
}
// validateResourcePool validates the resource pool before submitting job
func (h *serviceHandler) validateResourcePool(
respoolID *peloton.ResourcePoolID,
) (*respool.ResourcePoolPath, error) {
ctx, cancelFunc := context.WithTimeout(h.rootCtx, 10*time.Second)
defer cancelFunc()
if respoolID == nil {
return nil, errNullResourcePoolID
}
if respoolID.GetValue() == common.RootResPoolID {
return nil, errRootResourcePoolID
}
var request = &respool.GetRequest{
Id: respoolID,
}
response, err := h.respoolClient.GetResourcePool(ctx, request)
if err != nil {
return nil, err
}
if response.GetError() != nil {
return nil, errResourcePoolNotFound
}
if response.GetPoolinfo() != nil && response.GetPoolinfo().Id != nil {
if response.GetPoolinfo().Id.Value != respoolID.Value {
return nil, errResourcePoolNotFound
}
} else {
return nil, errResourcePoolNotFound
}
if len(response.GetPoolinfo().GetChildren()) > 0 {
return nil, errNonLeafResourcePool
}
return response.GetPoolinfo().GetPath(), nil
}
// validateSecretsAndConfig checks the secrets for input sanity and makes sure
// that config does not contain any existing secret volumes because that is
// not supported.
func (h *serviceHandler) validateSecretsAndConfig(
config *job.JobConfig, secrets []*peloton.Secret) error {
// make sure that config doesn't have any secret volumes
if util.ConfigHasSecretVolumes(config.GetDefaultConfig()) {
return yarpcerrors.InvalidArgumentErrorf(
"adding secret volumes directly in config is not allowed",
)
}
// validate secrets payload for input sanity
if len(secrets) == 0 {
return nil
}
if !h.jobSvcCfg.EnableSecrets && len(secrets) > 0 {
return yarpcerrors.InvalidArgumentErrorf(
"secrets not supported by cluster",
)
}
for _, secret := range secrets {
if secret.GetPath() == "" {
return yarpcerrors.InvalidArgumentErrorf(
"secret does not have a path")
}
// Validate that secret is base64 encoded
_, err := base64.StdEncoding.DecodeString(
string(secret.GetValue().GetData()))
if err != nil {
return yarpcerrors.InvalidArgumentErrorf(
fmt.Sprintf("failed to decode secret with error: %v", err),
)
}
}
return nil
}
// validateMesosContainerizerForSecrets returns error if default config doesn't
// use mesos containerizer. Secrets will be common for all instances in a job.
// They will be a part of default container config. This means that if a job is
// created with secrets, we will ensure that the job also has a default config
// with mesos containerizer. The secrets will be used by all tasks in that job
// and all tasks must use mesos containerizer for processing secrets.
// We will not enforce that instance config has mesos containerizer and let
// instance config override this to keep with existing convention.
func validateMesosContainerizerForSecrets(jobConfig *job.JobConfig) error {
// make sure that default config uses mesos containerizer
if jobConfig.GetDefaultConfig().GetContainer().GetType() !=
mesos.ContainerInfo_MESOS {
return yarpcerrors.InvalidArgumentErrorf(
fmt.Sprintf("container type %v does not match %v",
jobConfig.GetDefaultConfig().GetContainer().GetType(),
mesos.ContainerInfo_MESOS),
)
}
return nil
}
// handleCreateSecrets handles secrets to be added at the time of creating a job
func (h *serviceHandler) handleCreateSecrets(
ctx context.Context, jobID *peloton.JobID,
config *job.JobConfig, secrets []*peloton.Secret,
) error {
// if there are no secrets in the request,
// job create doesn't need to handle secrets
if len(secrets) == 0 {
return nil
}
// Make sure that the default config is using Mesos containerizer
if err := validateMesosContainerizerForSecrets(config); err != nil {
return err
}
// for each secret, store it in DB and add a secret volume to defaultconfig
err := h.addSecretsToDBAndConfig(ctx, jobID, config, secrets, false)
return err
}
// handleUpdateSecrets handles secrets to be added/updated for a job
func (h *serviceHandler) handleUpdateSecrets(
ctx context.Context, jobID *peloton.JobID, secretVolumes []*mesos.Volume,
newConfig *job.JobConfig, secrets []*peloton.Secret,
) error {
// if there are no existing secret volumes and no secrets in the request,
// this job update doesn't need to handle secrets
if len(secretVolumes) == 0 && len(secrets) == 0 {
return nil
}
// Make sure all existing secret volumes are covered in the secrets.
// Separate secrets into adds and updates.
addSecrets, updateSecrets, err := h.validateExistingSecretVolumes(
ctx, secretVolumes, secrets)
if err != nil {
return err
}
// add new secrets in DB and add them as secret volumes to defaultconfig
if err = h.addSecretsToDBAndConfig(
ctx, jobID, newConfig, addSecrets, false); err != nil {
return err
}
// update secrets in DB and add them as secret volumes to defaultconfig
err = h.addSecretsToDBAndConfig(ctx, jobID, newConfig, updateSecrets, true)
return err
}
func (h *serviceHandler) addSecretsToDBAndConfig(
ctx context.Context, jobID *peloton.JobID, jobConfig *job.JobConfig,
secrets []*peloton.Secret, update bool) error {
// for each secret, store it in DB and add a secret volume to defaultconfig
for _, secret := range secrets {
if secret.GetId().GetValue() == "" {
secret.Id = &peloton.SecretID{
Value: uuid.New(),
}
log.WithField("job_id", secret.GetId().GetValue()).
Info("Genarating UUID for empty secret ID")
}
// store secret in DB
if update {
if err := h.secretInfoOps.UpdateSecretData(
ctx,
jobID.GetValue(),
string(secret.GetValue().GetData()),
); err != nil {
return err
}
} else {
if err := h.secretInfoOps.CreateSecret(
ctx,
jobID.GetValue(),
time.Now(),
secret.Id.GetValue(),
string(secret.GetValue().GetData()),
secret.Path,
); err != nil {
return err
}
}
// Add volume/secret to default container config with this secret
// Use secretID instead of secret data when storing as
// part of default config in DB.
// This is done to prevent secrets leaks via logging/API etc.
// At the time of task launch, launcher will read the
// secret by secret-id and replace it by secret data.
jobConfig.GetDefaultConfig().GetContainer().Volumes =
append(jobConfig.GetDefaultConfig().GetContainer().Volumes,
util.CreateSecretVolume(secret.GetPath(),
secret.GetId().GetValue()),
)
}
return nil
}
// validateExistingSecretVolumes goes through existing secret volumes and
// validates that the new secrets list contains a secret as existing secrets
// for that job. It splits the secrets in request as addSecrets and
// updateSecrets. addSecrets will be created newly in DB and added to the
// defaultconfig. updateSecrets will be updated in the DB only because they
// are already present in defaultconfig.
//
// We do not have authN/authZ support on Peloton as of now.
// So there could be a security hole like this:
// Alice launches jobA with secrets a1,a2,a3
// Bob updates jobA and adds more tasks to it
// Bob is not authorized to use secrets a1,a2,a3 but the new task on jobA
// would still be able to access them
// To fix this hole, until authN/authZ is available, we will ensure that any
// Update request to a job that has secrets associated with it, contains
// existing secrets (same ID or path) as part of the request. The secret data
// could be different. This ensures that in the above example, Bob can never
// have access to a1,a2,a3.
// TODO: Remove this restriction after authN/authZ is enabled
// At that time, we will be sure that the job owner is also the secret
// owner and is updating the job
func (h *serviceHandler) validateExistingSecretVolumes(
ctx context.Context, secretVolumes []*mesos.Volume,
secrets []*peloton.Secret,
) (addSecrets []*peloton.Secret, updateSecrets []*peloton.Secret, err error) {
// the number of secrets in the request should be >= the number of
// existing secrets in the job config
if len(secrets) < len(secretVolumes) {
return nil, nil, yarpcerrors.InvalidArgumentErrorf(
"number of secrets in request should be >= existing secrets")
}
// create a map of new secrets provided in the request
secretMap := make(map[string]*peloton.Secret)
for _, secret := range secrets {
if secret.GetId().GetValue() != "" {
secretMap[secret.GetId().GetValue()] = secret
} else if secret.GetPath() != "" {
// TODO: Remove this after we have separate API
// for maintaining secrets at which point, secrets should be always
// identified by secretID or name (and not created as part of Job
// Create/Update API)
// currently, the provided secrets may or may not have an ID
// so we can identify them with Path
secretMap[secret.GetPath()] = secret
}
}
// Go through each secret volume, then verify that the secret is also
// present in the new secrets list
for _, volume := range secretVolumes {
// verify that the secret ID or Path in the existing secret volume
// is present in the secrets provided in the API request
existingSecretID := volume.GetSource().GetSecret().GetValue().GetData()
existingSecretPath := volume.GetContainerPath()
if secret, ok := secretMap[string(existingSecretID)]; ok {
updateSecrets = append(updateSecrets, secret)
delete(secretMap, string(existingSecretID))
} else if secret, ok := secretMap[string(existingSecretPath)]; ok {
// provided secret doesn't have ID but matches the path of an
// existing secret. Assign existing secretID to this.
secret.GetId().Value = string(existingSecretID)
updateSecrets = append(updateSecrets, secret)
delete(secretMap, string(existingSecretPath))
} else {
return nil, nil, yarpcerrors.InvalidArgumentErrorf(
fmt.Sprintf("request missing secret with id %v path %v",
string(existingSecretID), existingSecretPath))
}
}
// Now the secrets that remain in the secretMap don't already exist.
// They should be added not updated.
for _, secret := range secretMap {
addSecrets = append(addSecrets, secret)
}
return addSecrets, updateSecrets, nil
}
// In batch job update, instancesConfig of newConfig only need to include the config
// for the additional instances. If a job config is directly updated to newConfig,
// JobMgr would lose the track of previous instance config. As a result, JobMgr has
// to use the merged result of instanceConfig in oldConfig and newConfig.
// configs passed in mergeInstanceConfig must have been validated.
func mergeInstanceConfig(oldConfig *job.JobConfig, newConfig *job.JobConfig) *job.JobConfig {
result := *newConfig
newInstanceConfig := make(map[uint32]*task.TaskConfig)
for instanceID, instanceConfig := range oldConfig.InstanceConfig {
newInstanceConfig[instanceID] = instanceConfig
}
for instanceID, instanceConfig := range newConfig.InstanceConfig {
newInstanceConfig[instanceID] = instanceConfig
}
result.InstanceConfig = newInstanceConfig
return &result
}
// convertRangesToSlice merges ranges into a single slice and remove
// any duplicated item
// need the instanceCount because cli may send max uint32 when range is not specified.
// TODO: cli send nil ranges when not specified
func convertRangesToSlice(ranges []*task.InstanceRange, instanceCount uint32) []uint32 {
var result []uint32
set := make(map[uint32]bool)
for _, instanceRange := range ranges {
for i := instanceRange.GetFrom(); i < instanceRange.GetTo(); i++ {
// ignore instances above instanceCount
if i >= instanceCount {
break
}
// dedup result
if !set[i] {
result = append(result, i)
set[i] = true
}
}
}
return result
}
// NewTestServiceHandler returns an empty new ServiceHandler ptr for testing.
func NewTestServiceHandler() *serviceHandler {
return &serviceHandler{}
}