pkg/jobmgr/jobsvc/private/handler.go (369 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package private
import (
"context"
"time"
pbjob "github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless"
v1alphapeloton "github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton"
"github.com/uber/peloton/.gen/peloton/private/jobmgrsvc"
"github.com/uber/peloton/pkg/common/api"
"github.com/uber/peloton/pkg/common/leader"
"github.com/uber/peloton/pkg/common/util"
versionutil "github.com/uber/peloton/pkg/common/util/entityversion"
yarpcutil "github.com/uber/peloton/pkg/common/util/yarpc"
"github.com/uber/peloton/pkg/jobmgr/cached"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
"github.com/uber/peloton/pkg/jobmgr/goalstate"
"github.com/uber/peloton/pkg/storage"
ormobjects "github.com/uber/peloton/pkg/storage/objects"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc"
"go.uber.org/yarpc/yarpcerrors"
)
type serviceHandler struct {
jobStore storage.JobStore
updateStore storage.UpdateStore
taskStore storage.TaskStore
jobIndexOps ormobjects.JobIndexOps
jobConfigOps ormobjects.JobConfigOps
jobRuntimeOps ormobjects.JobRuntimeOps
jobNameToIDOps ormobjects.JobNameToIDOps
jobFactory cached.JobFactory
goalStateDriver goalstate.Driver
candidate leader.Candidate
rootCtx context.Context
}
// InitPrivateJobServiceHandler initializes the Job
// Manager's private API Service Handler
func InitPrivateJobServiceHandler(
d *yarpc.Dispatcher,
jobStore storage.JobStore,
updateStore storage.UpdateStore,
taskStore storage.TaskStore,
ormStore *ormobjects.Store,
jobFactory cached.JobFactory,
goalStateDriver goalstate.Driver,
candidate leader.Candidate,
) {
handler := &serviceHandler{
jobStore: jobStore,
updateStore: updateStore,
taskStore: taskStore,
jobIndexOps: ormobjects.NewJobIndexOps(ormStore),
jobConfigOps: ormobjects.NewJobConfigOps(ormStore),
jobRuntimeOps: ormobjects.NewJobRuntimeOps(ormStore),
jobNameToIDOps: ormobjects.NewJobNameToIDOps(ormStore),
jobFactory: jobFactory,
goalStateDriver: goalStateDriver,
candidate: candidate,
}
d.Register(jobmgrsvc.BuildJobManagerServiceYARPCProcedures(handler))
}
func (h *serviceHandler) GetThrottledPods(
ctx context.Context,
req *jobmgrsvc.GetThrottledPodsRequest,
) (resp *jobmgrsvc.GetThrottledPodsResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Info("JobSVC.GetThrottledTasks failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("headers", headers).
Debug("JobSVC.GetThrottledTasks succeeded")
}()
var throttledPods []*v1alphapeloton.PodName
cachedJobs := h.jobFactory.GetAllJobs()
for jobID, cachedJob := range cachedJobs {
cachedConfig, tmpErr := cachedJob.GetConfig(ctx)
if tmpErr != nil {
log.WithError(tmpErr).
WithField("job_id", jobID).
Info("Failed to get job config during fetching throttled pods")
continue
}
if cachedConfig.GetType() != pbjob.JobType_SERVICE {
continue
}
cachedTasks := cachedJob.GetAllTasks()
for instID, cachedTask := range cachedTasks {
runtime, tmpErr := cachedTask.GetRuntime(ctx)
if tmpErr != nil {
log.WithError(tmpErr).
WithFields(log.Fields{
"job_id": jobID,
"instance_id": instID,
}).
Info("Failed to get task runtime during fetching throttled pods")
continue
}
if util.IsTaskThrottled(runtime.GetState(), runtime.GetMessage()) {
podName := &v1alphapeloton.PodName{
Value: util.CreatePelotonTaskID(jobID, instID),
}
throttledPods = append(throttledPods, podName)
}
}
}
resp = &jobmgrsvc.GetThrottledPodsResponse{
ThrottledPods: throttledPods,
}
return
}
func (h *serviceHandler) RefreshJob(
ctx context.Context,
req *jobmgrsvc.RefreshJobRequest) (resp *jobmgrsvc.RefreshJobResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("JobSVC.RefreshJob failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Info("JobSVC.RefreshJob succeeded")
}()
if !h.candidate.IsLeader() {
return nil,
yarpcerrors.UnavailableErrorf("JobSVC.RefreshJob is not supported on non-leader")
}
pelotonJobID := &peloton.JobID{Value: req.GetJobId().GetValue()}
jobRuntime, err := h.jobRuntimeOps.Get(ctx, pelotonJobID)
if err != nil {
return nil, errors.Wrap(err, "fail to get job runtime")
}
jobConfig, configAddOn, err := h.jobConfigOps.Get(
ctx,
&peloton.JobID{Value: req.GetJobId().GetValue()},
jobRuntime.GetConfigurationVersion())
if err != nil {
return nil, errors.Wrap(err, "fail to get job config")
}
cachedJob := h.jobFactory.AddJob(pelotonJobID)
cachedJob.Update(ctx, &pbjob.JobInfo{
Config: jobConfig,
Runtime: jobRuntime,
}, configAddOn,
nil,
cached.UpdateCacheOnly)
h.goalStateDriver.EnqueueJob(pelotonJobID, time.Now())
return &jobmgrsvc.RefreshJobResponse{}, nil
}
func (h *serviceHandler) GetJobCache(
ctx context.Context,
req *jobmgrsvc.GetJobCacheRequest) (resp *jobmgrsvc.GetJobCacheResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("JobSVC.GetJobCache failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Debug("JobSVC.GetJobCache succeeded")
}()
cachedJob := h.jobFactory.GetJob(&peloton.JobID{Value: req.GetJobId().GetValue()})
if cachedJob == nil {
return nil,
yarpcerrors.NotFoundErrorf("job not found in cache")
}
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return nil, errors.Wrap(err, "fail to get job runtime")
}
config, err := cachedJob.GetConfig(ctx)
if err != nil {
return nil, errors.Wrap(err, "fail to get job config")
}
var cachedWorkflow cached.Update
if len(runtime.GetUpdateID().GetValue()) > 0 {
cachedWorkflow = cachedJob.GetWorkflow(runtime.GetUpdateID())
}
status := convertCacheToJobStatus(runtime)
status.WorkflowStatus = convertCacheToWorkflowStatus(cachedWorkflow)
return &jobmgrsvc.GetJobCacheResponse{
Spec: convertCacheJobConfigToJobSpec(config),
Status: status,
}, nil
}
func (h *serviceHandler) QueryJobCache(
ctx context.Context,
req *jobmgrsvc.QueryJobCacheRequest,
) (resp *jobmgrsvc.QueryJobCacheResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("JobSVC.QueryJobCache failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("num_of_results", len(resp.GetResult())).
WithField("headers", headers).
Debug("JobSVC.QueryJobCache succeeded")
}()
if !h.goalStateDriver.Started() {
return nil, yarpcerrors.UnavailableErrorf(
"QueryJobCache is not available until goal state driver finish start process")
}
var result []*jobmgrsvc.QueryJobCacheResponse_JobCache
jobs := h.jobFactory.GetAllJobs()
for _, job := range jobs {
cachedConfig, err := job.GetConfig(ctx)
// job may be removed from db but not yet cleaned
// up from job factory
if err != nil && yarpcerrors.IsNotFound(err) {
continue
}
// for other kind of error, directly return from handler
if err != nil {
return nil, err
}
if nameMatch(cachedConfig.GetName(), req.GetSpec().GetName()) &&
labelMatch(api.ConvertLabels(cachedConfig.GetLabels()), req.GetSpec().GetLabels()) {
result = append(result, &jobmgrsvc.QueryJobCacheResponse_JobCache{
JobId: &v1alphapeloton.JobID{Value: job.ID().GetValue()},
Name: cachedConfig.GetName(),
})
}
}
return &jobmgrsvc.QueryJobCacheResponse{Result: result}, nil
}
func (h *serviceHandler) GetInstanceAvailabilityInfoForJob(
ctx context.Context,
req *jobmgrsvc.GetInstanceAvailabilityInfoForJobRequest,
) (resp *jobmgrsvc.GetInstanceAvailabilityInfoForJobResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("JobSVC.GetInstanceAvailabilityInfoForJob failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Debug("JobSVC.GetInstanceAvailabilityInfoForJob succeeded")
}()
job := h.jobFactory.GetJob(&peloton.JobID{Value: req.GetJobId().GetValue()})
instanceAvailabilityMap := make(map[uint32]string)
for i, t := range job.GetInstanceAvailabilityType(ctx, req.GetInstances()...) {
instanceAvailabilityMap[i] = jobmgrcommon.InstanceAvailability_name[t]
}
return &jobmgrsvc.GetInstanceAvailabilityInfoForJobResponse{
InstanceAvailabilityMap: instanceAvailabilityMap,
}, nil
}
// nameMatch returns true if queryName not set, or jobName
// and queryName are the same
func nameMatch(jobName string, queryName string) bool {
if len(queryName) == 0 {
return true
}
return jobName == queryName
}
// labelMatch returns if jobLabels contains all elements in queryLables
func labelMatch(jobLabels []*v1alphapeloton.Label, queryLabels []*v1alphapeloton.Label) bool {
if len(queryLabels) == 0 {
return true
}
labelMap := constructLabelsMap(jobLabels)
for _, l := range queryLabels {
if v, ok := labelMap[l.GetKey()]; !ok {
return false
} else if v != l.GetValue() {
return false
}
}
return true
}
func constructLabelsMap(labels []*v1alphapeloton.Label) map[string]string {
result := make(map[string]string)
for _, label := range labels {
result[label.GetKey()] = label.GetValue()
}
return result
}
func convertCacheToJobStatus(
runtime *pbjob.RuntimeInfo,
) *stateless.JobStatus {
result := &stateless.JobStatus{}
result.Revision = &v1alphapeloton.Revision{
Version: runtime.GetRevision().GetVersion(),
CreatedAt: runtime.GetRevision().GetCreatedAt(),
UpdatedAt: runtime.GetRevision().GetUpdatedAt(),
UpdatedBy: runtime.GetRevision().GetUpdatedBy(),
}
result.State = stateless.JobState(runtime.GetState())
result.CreationTime = runtime.GetCreationTime()
result.PodStats = api.ConvertTaskStatsToPodStats(runtime.TaskStats)
result.DesiredState = stateless.JobState(runtime.GetGoalState())
result.Version = versionutil.GetJobEntityVersion(
runtime.GetConfigurationVersion(),
runtime.GetDesiredStateVersion(),
runtime.GetWorkflowVersion())
return result
}
func convertCacheToWorkflowStatus(
cachedWorkflow cached.Update,
) *stateless.WorkflowStatus {
if cachedWorkflow == nil {
return nil
}
workflowStatus := &stateless.WorkflowStatus{}
workflowStatus.Type = stateless.WorkflowType(cachedWorkflow.GetWorkflowType())
workflowStatus.State = stateless.WorkflowState(cachedWorkflow.GetState().State)
workflowStatus.PrevState = stateless.WorkflowState(cachedWorkflow.GetPrevState())
workflowStatus.NumInstancesCompleted = uint32(len(cachedWorkflow.GetInstancesDone()))
workflowStatus.NumInstancesFailed = uint32(len(cachedWorkflow.GetInstancesFailed()))
workflowStatus.NumInstancesRemaining =
uint32(len(cachedWorkflow.GetGoalState().Instances) -
len(cachedWorkflow.GetInstancesDone()) -
len(cachedWorkflow.GetInstancesFailed()))
workflowStatus.InstancesCurrent = cachedWorkflow.GetInstancesCurrent()
workflowStatus.PrevVersion = versionutil.GetPodEntityVersion(cachedWorkflow.GetState().JobVersion)
workflowStatus.Version = versionutil.GetPodEntityVersion(cachedWorkflow.GetGoalState().JobVersion)
return workflowStatus
}
func convertCacheJobConfigToJobSpec(config jobmgrcommon.JobConfig) *stateless.JobSpec {
result := &stateless.JobSpec{}
// set the fields used by both job config and cached job config
result.InstanceCount = config.GetInstanceCount()
result.RespoolId = &v1alphapeloton.ResourcePoolID{
Value: config.GetRespoolID().GetValue(),
}
if config.GetSLA() != nil {
result.Sla = &stateless.SlaSpec{
Priority: config.GetSLA().GetPriority(),
Preemptible: config.GetSLA().GetPreemptible(),
Revocable: config.GetSLA().GetRevocable(),
MaximumUnavailableInstances: config.GetSLA().GetMaximumUnavailableInstances(),
}
}
result.Revision = &v1alphapeloton.Revision{
Version: config.GetChangeLog().GetVersion(),
CreatedAt: config.GetChangeLog().GetCreatedAt(),
UpdatedAt: config.GetChangeLog().GetUpdatedAt(),
UpdatedBy: config.GetChangeLog().GetUpdatedBy(),
}
if _, ok := config.(*pbjob.JobConfig); ok {
// TODO: set the rest of the fields in result
// if the config passed in is a full config
}
return result
}
// NewTestServiceHandler returns an empty new ServiceHandler ptr for testing.
func NewTestServiceHandler() *serviceHandler {
return &serviceHandler{}
}