pkg/aurorabridge/handler.go (1,912 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aurorabridge
import (
"context"
"fmt"
"math"
"sort"
"strings"
"sync"
"time"
v0peloton "github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless"
statelesssvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless/svc"
"github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton"
"github.com/uber/peloton/.gen/peloton/api/v1alpha/pod"
podsvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod/svc"
"github.com/uber/peloton/.gen/peloton/private/jobmgrsvc"
"github.com/uber/peloton/.gen/thrift/aurora/api"
"github.com/uber/peloton/pkg/aurorabridge/atop"
"github.com/uber/peloton/pkg/aurorabridge/cache"
"github.com/uber/peloton/pkg/aurorabridge/common"
"github.com/uber/peloton/pkg/aurorabridge/label"
"github.com/uber/peloton/pkg/aurorabridge/opaquedata"
"github.com/uber/peloton/pkg/aurorabridge/ptoa"
"github.com/uber/peloton/pkg/common/concurrency"
"github.com/uber/peloton/pkg/common/util"
versionutil "github.com/uber/peloton/pkg/common/util/entityversion"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
"go.uber.org/thriftrw/ptr"
"go.uber.org/yarpc/yarpcerrors"
)
var errUnimplemented = errors.New("rpc is unimplemented")
const (
// Minimum task query depth when PodRunsDepth in the config is set to
// larger than and equals to 2 and bounded by PodRunsMax.
minPodRunsDepth = 2
)
// jobCache is an internal struct used to capture job id and name
// of the a specific job. Mostly used as the job query return result.
type jobCache struct {
JobId *peloton.JobID
Name string
}
// ServiceHandler implements a partial Aurora API. Various unneeded methods have
// been left intentionally unimplemented.
type ServiceHandler struct {
config ServiceHandlerConfig
metrics *Metrics
jobClient statelesssvc.JobServiceYARPCClient
jobmgrClient jobmgrsvc.JobManagerServiceYARPCClient
podClient podsvc.PodServiceYARPCClient
respoolLoader RespoolLoader
random common.Random
jobIdCache cache.JobIDCache
}
// NewServiceHandler creates a new ServiceHandler.
func NewServiceHandler(
config ServiceHandlerConfig,
parent tally.Scope,
jobClient statelesssvc.JobServiceYARPCClient,
jobmgrClient jobmgrsvc.JobManagerServiceYARPCClient,
podClient podsvc.PodServiceYARPCClient,
respoolLoader RespoolLoader,
random common.Random,
jobIdCache cache.JobIDCache,
) (*ServiceHandler, error) {
config.normalize()
if err := config.validate(); err != nil {
return nil, err
}
return &ServiceHandler{
config: config,
metrics: NewMetrics(parent.SubScope("aurorabridge").SubScope("api")),
jobClient: jobClient,
jobmgrClient: jobmgrClient,
podClient: podClient,
respoolLoader: respoolLoader,
random: random,
jobIdCache: jobIdCache,
}, nil
}
// GetJobSummary returns a summary of jobs, optionally only those owned by a specific role.
func (h *ServiceHandler) GetJobSummary(
ctx context.Context,
role *string,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.getJobSummary(ctx, role)
resp := newResponse(result, err, "getJobSummary")
defer func() {
h.metrics.
Procedures[ProcedureGetJobSummary].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureGetJobSummary].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"role": role,
},
"code": err.responseCode,
"error": err.msg,
}).Error("GetJobSummary error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"role": role,
},
"result": result,
}).Debug("GetJobSummary success")
}()
return resp, nil
}
func (h *ServiceHandler) getJobSummary(
ctx context.Context,
role *string,
) (*api.Result, *auroraError) {
var jobIDs []*peloton.JobID
var err error
if role != nil && *role != "" {
jobIDs, err = h.getJobIDsFromRoleCache(ctx, *role)
} else {
jobIDs, err = h.queryJobIDs(ctx, "", "", "")
}
if err != nil {
return nil, auroraErrorf("get job ids from role: %s", err)
}
var inputs []interface{}
for _, j := range jobIDs {
inputs = append(inputs, j)
}
f := func(ctx context.Context, input interface{}) (interface{}, error) {
jobID := input.(*peloton.JobID)
jobInfo, err := h.getJobInfo(ctx, jobID)
if err != nil {
if yarpcerrors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("get job info for job id %q: %s",
jobID.GetValue(), err)
}
// If the job is stopped, skip it from the response
if jobInfo.GetSpec().GetInstanceCount() == 0 {
return nil, nil
}
// In Aurora, JobSummary.JobConfiguration.TaskConfig
// is generated using latest "active" task. Reference:
// https://github.com/apache/aurora/blob/master/src/main/java/org/apache/aurora/scheduler/base/Tasks.java#L133
// but use JobInfo.JobSpec.DefaultSpec here to simplify
// the querying logic.
// TODO(kevinxu): Need to match Aurora's behavior?
// TODO(kevinxu): Need to inspect InstanceSpec as well?
podSpec := jobInfo.GetSpec().GetDefaultSpec()
s, err := ptoa.NewJobSummary(
convertJobInfoToJobSummary(jobInfo),
podSpec,
)
if err != nil {
return nil, fmt.Errorf("new job summary: %s", err)
}
return s, nil
}
outputs, err := concurrency.Map(
ctx,
concurrency.MapperFunc(f),
inputs,
h.config.GetJobSummaryWorkers)
if err != nil {
return nil, auroraErrorf(err.Error())
}
summaries := []*api.JobSummary{}
for _, o := range outputs {
if o == nil {
continue
}
summary := o.(*api.JobSummary)
if summary == nil {
continue
}
summaries = append(summaries, summary)
}
return &api.Result{
JobSummaryResult: &api.JobSummaryResult{
Summaries: summaries,
},
}, nil
}
// GetTasksWithoutConfigs is the same as getTasksStatus but without the TaskConfig.ExecutorConfig
// data set.
func (h *ServiceHandler) GetTasksWithoutConfigs(
ctx context.Context,
query *api.TaskQuery,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.getTasksWithoutConfigs(ctx, query)
resp := newResponse(result, err, "getTasksWithoutConfigs")
defer func() {
h.metrics.
Procedures[ProcedureGetTasksWithoutConfigs].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureGetTasksWithoutConfigs].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"query": query,
},
"code": err.responseCode,
"error": err.msg,
}).Error("GetTasksWithoutConfigs error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"query": query,
},
"result": result,
}).Debug("GetTasksWithoutConfigs success")
}()
return resp, nil
}
func (h *ServiceHandler) getTasksWithoutConfigs(
ctx context.Context,
query *api.TaskQuery,
) (*api.Result, *auroraError) {
var podStates []pod.PodState
for s := range query.GetStatuses() {
p, err := atop.NewPodState(s)
if err != nil {
return nil, auroraErrorf("new pod state: %s", err)
}
podStates = append(podStates, p)
}
jobIDs, err := h.getJobIDsFromTaskQuery(ctx, query)
if err != nil {
return nil, auroraErrorf("get job ids from task query: %s", err)
}
// concurrency.Map in nested setup leaks goroutines, thus using
// a custom worker pool implementation.
mapOutputs := make([]struct {
ts []*api.ScheduledTask
err error
}, len(jobIDs))
mapWait := sync.WaitGroup{}
for i, jobID := range jobIDs {
mapWait.Add(1)
go func(i int, j *peloton.JobID) {
defer mapWait.Done()
jobSummary, err := h.getJobInfoSummary(ctx, j)
if err != nil {
if yarpcerrors.IsNotFound(err) {
return
}
mapOutputs[i].err = fmt.Errorf(
"get job info for job id %q: %s",
j.GetValue(), err)
return
}
pods, err := h.queryPods(
ctx,
j,
jobSummary.GetInstanceCount(),
)
if err != nil {
mapOutputs[i].err = fmt.Errorf(
"query pods for job id %q: %s",
j.GetValue(), err)
return
}
ts, err := h.getScheduledTasks(
ctx,
jobSummary,
pods,
&taskFilter{statuses: query.GetStatuses()},
)
if err != nil {
mapOutputs[i].err = fmt.Errorf("get tasks without configs: %s", err)
return
}
mapOutputs[i].ts = ts
return
}(i, jobID)
}
mapWait.Wait()
tasks := []*api.ScheduledTask{}
for _, o := range mapOutputs {
if o.err != nil {
return nil, auroraErrorf(o.err.Error())
}
if o.ts == nil {
continue
}
tasks = append(tasks, o.ts...)
}
return &api.Result{
ScheduleStatusResult: &api.ScheduleStatusResult{
Tasks: tasks,
},
}, nil
}
type taskFilter struct {
statuses map[api.ScheduleStatus]struct{}
}
// include returns true if s is allowed by the filter.
func (f *taskFilter) include(t *api.ScheduledTask) bool {
if len(f.statuses) > 0 {
if _, ok := f.statuses[t.GetStatus()]; !ok {
return false
}
}
return true
}
type getScheduledTaskInput struct {
podName *peloton.PodName
podID *peloton.PodID
instanceID uint32
// For current run
jobSummary *stateless.JobSummary
podSpec *pod.PodSpec
}
// getPodRunsLimit calculates the number of pod runs getTasksWithoutConfigs
// endpoint will return based on config.
func getPodRunsLimit(
podsNum uint32,
podsMax uint32,
podRunsDepth uint32,
) uint32 {
podsNumTotal := util.Min(podsNum*podRunsDepth, podsMax)
podRuns := uint32(math.Ceil(float64(podsNumTotal) / float64(podsNum)))
return util.Max(podRuns, util.Min(podRunsDepth, minPodRunsDepth))
}
// getScheduledTasks generates a list of Aurora ScheduledTask in a worker
// pool.
func (h *ServiceHandler) getScheduledTasks(
ctx context.Context,
jobSummary *stateless.JobSummary,
podInfos []*pod.PodInfo,
filter *taskFilter,
) ([]*api.ScheduledTask, error) {
jobID := jobSummary.GetJobId()
podRunsDepth := getPodRunsLimit(
uint32(len(podInfos)),
uint32(h.config.GetTasksPodMax),
uint32(h.config.PodRunsDepth),
)
var inputs []interface{}
for _, p := range podInfos {
podSpec := p.GetSpec()
podID := p.GetStatus().GetPodId()
podName := podSpec.GetPodName()
runID, err := util.ParseRunID(podID.GetValue())
if err != nil {
return nil, fmt.Errorf("failed to parse pod id: %s", err)
}
_, instanceID, err := util.ParseTaskID(podName.GetValue())
if err != nil {
return nil, fmt.Errorf("failed to parse pod name: %s", err)
}
// when PodRunsDepth set to 1, query only current run pods, when set
// to larger than 1, will query current plus previous run pods
for i := uint64(0); i < uint64(podRunsDepth); i++ {
newRunID := runID - i
if newRunID == 0 {
// No more previous run pods
break
}
newPodID := &peloton.PodID{
Value: util.CreateMesosTaskID(&v0peloton.JobID{
Value: jobID.GetValue(),
}, instanceID, newRunID).GetValue(),
}
taskInput := &getScheduledTaskInput{
podName: podName,
instanceID: instanceID,
}
if i == 0 {
// Attach for current run, leave podID to nil so that
// current run will be queried
taskInput.jobSummary = jobSummary
taskInput.podSpec = podSpec
} else {
// Attach for previous run
taskInput.podID = newPodID
}
inputs = append(inputs, taskInput)
}
}
lock := &sync.Mutex{}
jobInfoVersionMap := make(map[string]*stateless.JobInfo)
getJobInfoForVersion := func(podVersion string) (*stateless.JobInfo, error) {
lock.Lock()
defer lock.Unlock()
var jobInfo *stateless.JobInfo
var err error
jobInfo, ok := jobInfoVersionMap[podVersion]
if !ok {
jobInfo, err = h.getFullJobInfoByVersion(
ctx,
jobID,
&peloton.EntityVersion{Value: podVersion},
)
if err != nil {
return nil, fmt.Errorf("get job info by version: %s", err)
}
jobInfoVersionMap[podVersion] = jobInfo
}
return jobInfo, nil
}
f := func(ctx context.Context, input interface{}) (interface{}, error) {
taskInput, ok := input.(*getScheduledTaskInput)
if !ok {
return nil, fmt.Errorf("failed to cast to get scheduled task input")
}
podName := taskInput.podName
podID := taskInput.podID
instanceID := taskInput.instanceID
podEvents, err := h.getPodEvents(
ctx,
podName,
podID,
)
if err != nil {
return nil, fmt.Errorf(
"get pod events for pod %q with pod id %q: %s",
podName.GetValue(), podID.GetValue(), err)
}
if len(podEvents) == 0 {
return nil, nil
}
var t *api.ScheduledTask
if taskInput.jobSummary != nil && taskInput.podSpec != nil {
// For current pod run
t, err = ptoa.NewScheduledTask(
taskInput.jobSummary,
taskInput.podSpec,
podEvents,
)
if err != nil {
return nil, fmt.Errorf(
"new scheduled task: %s", err)
}
} else {
// For previous pod run
podVersion := podEvents[0].GetVersion().GetValue()
if len(podVersion) == 0 {
return nil, fmt.Errorf(
"cannot find pod version for pod: %s",
podID.GetValue())
}
prevJobInfo, err := getJobInfoForVersion(podVersion)
if err != nil {
return nil, fmt.Errorf("get job info for version: %s", err)
}
prevJobSummary := convertJobInfoToJobSummary(prevJobInfo)
prevPodSpec := getPodSpecForInstance(prevJobInfo.GetSpec(), instanceID)
t, err = ptoa.NewScheduledTask(prevJobSummary, prevPodSpec, podEvents)
if err != nil {
return nil, fmt.Errorf(
"new scheduled task: %s", err)
}
}
if !filter.include(t) {
return nil, nil
}
return t, nil
}
workers := h.config.getTasksWithoutConfigsWorkers(len(inputs))
outputs, err := concurrency.Map(
ctx,
concurrency.MapperFunc(f),
inputs,
workers)
if err != nil {
return nil, err
}
var tasks []*api.ScheduledTask
for _, o := range outputs {
t := o.(*api.ScheduledTask)
if t == nil {
continue
}
tasks = append(tasks, t)
}
return tasks, nil
}
// GetConfigSummary fetches the configuration summary of active tasks for the specified job.
func (h *ServiceHandler) GetConfigSummary(
ctx context.Context,
job *api.JobKey) (*api.Response, error) {
startTime := time.Now()
result, err := h.getConfigSummary(ctx, job)
resp := newResponse(result, err, "getConfigSummary")
defer func() {
h.metrics.
Procedures[ProcedureGetConfigSummary].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureGetConfigSummary].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"job": job,
},
"code": err.responseCode,
"error": err.msg,
}).Error("GetConfigSummary error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"job": job,
},
"result": result,
}).Debug("GetConfigSummary success")
}()
return resp, nil
}
func (h *ServiceHandler) getConfigSummary(
ctx context.Context,
jobKey *api.JobKey,
) (*api.Result, *auroraError) {
jobID, err := h.getJobID(
ctx,
jobKey)
if err != nil {
return nil, auroraErrorf("unable to get jobID from jobKey: %s", err)
}
jobSummary, err := h.getJobInfoSummary(ctx, jobID)
if err != nil {
return nil, auroraErrorf("unable to get jobSummary from jobID: %s", err)
}
podInfos, err := h.queryPods(
ctx,
jobID,
jobSummary.GetInstanceCount())
if err != nil {
return nil, auroraErrorf("unable to query pods using jobID: %s", err)
}
configSummary, err := ptoa.NewConfigSummary(
jobSummary,
podInfos)
if err != nil {
return nil, auroraErrorf("unable to get config summary from podInfos: %s", err)
}
return &api.Result{
ConfigSummaryResult: &api.ConfigSummaryResult{
Summary: configSummary,
},
}, nil
}
// GetJobs fetches the status of jobs. ownerRole is optional, in which case all jobs are returned.
func (h *ServiceHandler) GetJobs(
ctx context.Context,
ownerRole *string,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.getJobs(ctx, ownerRole)
resp := newResponse(result, err, "getJobs")
defer func() {
h.metrics.
Procedures[ProcedureGetJobs].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureGetJobs].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"ownerRole": ownerRole,
},
"code": err.responseCode,
"error": err.msg,
}).Error("GetJobs error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"ownerRole": ownerRole,
},
"result": result,
}).Debug("GetJobs success")
}()
return resp, nil
}
func (h *ServiceHandler) getJobs(
ctx context.Context,
ownerRole *string,
) (*api.Result, *auroraError) {
var jobIDs []*peloton.JobID
var err error
if ownerRole != nil && *ownerRole != "" {
jobIDs, err = h.getJobIDsFromRoleCache(ctx, *ownerRole)
} else {
jobIDs, err = h.queryJobIDs(ctx, "", "", "")
}
if err != nil {
return nil, auroraErrorf("get job ids from role: %s", err)
}
var inputs []interface{}
for _, j := range jobIDs {
inputs = append(inputs, j)
}
f := func(ctx context.Context, input interface{}) (interface{}, error) {
jobID := input.(*peloton.JobID)
jobInfo, err := h.getJobInfo(ctx, jobID)
if err != nil {
if yarpcerrors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("get job info for job id %q: %s",
jobID.GetValue(), err)
}
// If the job is stopped, skip it from the response
if jobInfo.GetSpec().GetInstanceCount() == 0 {
return nil, nil
}
// In Aurora, JobConfiguration.TaskConfig
// is generated using latest "active" task. Reference:
// https://github.com/apache/aurora/blob/master/src/main/java/org/apache/aurora/scheduler/base/Tasks.java#L133
// but use JobInfo.JobSpec.DefaultSpec here to simplify
// the querying logic.
// TODO(kevinxu): Need to match Aurora's behavior?
// TODO(kevinxu): Need to inspect InstanceSpec as well?
podSpec := jobInfo.GetSpec().GetDefaultSpec()
c, err := ptoa.NewJobConfiguration(
convertJobInfoToJobSummary(jobInfo),
podSpec,
true)
if err != nil {
return nil, fmt.Errorf("new job configuration: %s", err)
}
return c, nil
}
outputs, err := concurrency.Map(
ctx,
concurrency.MapperFunc(f),
inputs,
h.config.GetJobsWorkers)
if err != nil {
return nil, auroraErrorf(err.Error())
}
configs := []*api.JobConfiguration{}
for _, o := range outputs {
if o == nil {
continue
}
config := o.(*api.JobConfiguration)
if config == nil {
continue
}
configs = append(configs, config)
}
return &api.Result{
GetJobsResult: &api.GetJobsResult{
Configs: configs,
},
}, nil
}
// GetJobUpdateSummaries gets job update summaries.
func (h *ServiceHandler) GetJobUpdateSummaries(
ctx context.Context,
query *api.JobUpdateQuery,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.getJobUpdateSummaries(ctx, query)
resp := newResponse(result, err, "getJobUpdateSummaries")
defer func() {
h.metrics.
Procedures[ProcedureGetJobUpdateSummaries].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureGetJobUpdateSummaries].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"query": query,
},
"code": err.responseCode,
"error": err.msg,
}).Error("GetJobUpdateSummaries error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"query": query,
},
"result": result,
}).Debug("GetJobUpdateSummaries success")
}()
return resp, nil
}
func (h *ServiceHandler) getJobUpdateSummaries(
ctx context.Context,
query *api.JobUpdateQuery,
) (*api.Result, *auroraError) {
details, err := h.queryJobUpdates(ctx, query, false /* includeInstanceEvents */)
if err != nil {
return nil, auroraErrorf("query job updates: %s", err)
}
summaries := []*api.JobUpdateSummary{}
for _, d := range details {
summaries = append(summaries, d.GetUpdate().GetSummary())
}
return &api.Result{
GetJobUpdateSummariesResult: &api.GetJobUpdateSummariesResult{
UpdateSummaries: summaries,
},
}, nil
}
// GetJobUpdateDetails gets job update details.
// jobUpdateKey is marked to be deprecated from Aurora, and not used Aggregator
// It will be ignored to get job update details
func (h *ServiceHandler) GetJobUpdateDetails(
ctx context.Context,
key *api.JobUpdateKey,
query *api.JobUpdateQuery,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.getJobUpdateDetails(ctx, key, query)
resp := newResponse(result, err, "getJobUpdateDetails")
defer func() {
h.metrics.
Procedures[ProcedureGetJobUpdateDetails].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureGetJobUpdateDetails].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"key": key,
"query": query,
},
"code": err.responseCode,
"error": err.msg,
}).Error("GetJobUpdateDetails error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"key": key,
"query": query,
},
"result": result,
}).Debug("GetJobUpdateDetails success")
}()
return resp, nil
}
func (h *ServiceHandler) getJobUpdateDetails(
ctx context.Context,
key *api.JobUpdateKey,
query *api.JobUpdateQuery,
) (*api.Result, *auroraError) {
if key.IsSetJob() {
query.JobKey = key.GetJob()
}
details, err := h.queryJobUpdates(ctx, query, true /* includeInstanceEvents */)
if err != nil {
return nil, auroraErrorf("query job updates: %s", err)
}
if details == nil {
details = []*api.JobUpdateDetails{}
}
return &api.Result{
GetJobUpdateDetailsResult: &api.GetJobUpdateDetailsResult{
DetailsList: details,
},
}, nil
}
// GetJobUpdateDiff gets the diff between client (desired) and server (current) job states.
// TaskConfig is not set in GetJobUpdateDiffResult, since caller is not using it
// and fetching previous podspec is expensive
func (h *ServiceHandler) GetJobUpdateDiff(
ctx context.Context,
request *api.JobUpdateRequest) (*api.Response, error) {
startTime := time.Now()
result, err := h.getJobUpdateDiff(ctx, request)
resp := newResponse(result, err, "getJobUpdateDiff")
defer func() {
h.metrics.
Procedures[ProcedureGetJobUpdateDiff].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureGetJobUpdateDiff].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"request": request,
},
"code": err.responseCode,
"error": err.msg,
}).Error("GetJobUpdateDiff error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"request": request,
},
"result": result,
}).Debug("GetJobUpdateDiff success")
}()
return resp, nil
}
func (h *ServiceHandler) getJobUpdateDiff(
ctx context.Context,
request *api.JobUpdateRequest,
) (*api.Result, *auroraError) {
respoolID, err := h.respoolLoader.Load(
ctx,
label.IsGpuConfig(
request.GetTaskConfig().GetMetadata(),
request.GetTaskConfig().GetResources(),
),
)
if err != nil {
return nil, auroraErrorf("load respool: %s", err)
}
newJobResult := func() (*api.Result, *auroraError) {
last := max(0, request.GetInstanceCount()-1)
return &api.Result{
GetJobUpdateDiffResult: &api.GetJobUpdateDiffResult{
Add: []*api.ConfigGroup{{
Instances: []*api.Range{{
First: ptr.Int32(0),
Last: ptr.Int32(last),
}},
}},
},
}, nil
}
jobID, err := h.getJobID(ctx, request.GetTaskConfig().GetJob())
if err != nil {
if yarpcerrors.IsNotFound(err) {
// Peloton returns errors for non-existent jobs in GetReplaceJobDiff,
// so construct the diff manually in this case.
return newJobResult()
}
return nil, auroraErrorf("get job id: %s", err)
}
jobSummary, err := h.getJobInfoSummary(ctx, jobID)
if err != nil {
if yarpcerrors.IsNotFound(err) {
// Peloton returns errors for non-existent jobs in GetReplaceJobDiff,
// so construct the diff manually in this case.
return newJobResult()
}
return nil, auroraErrorf("get job summary: %s", err)
}
jobSpec, err := atop.NewJobSpecFromJobUpdateRequest(
request,
respoolID,
h.config.ThermosExecutor,
)
if err != nil {
return nil, auroraErrorf("new job spec: %s", err)
}
resp, err := h.jobClient.GetReplaceJobDiff(
ctx,
&statelesssvc.GetReplaceJobDiffRequest{
JobId: jobID,
Version: jobSummary.GetStatus().GetVersion(),
Spec: jobSpec,
})
if err != nil {
return nil, auroraErrorf("get replace job diff: %s", err)
}
return &api.Result{GetJobUpdateDiffResult: &api.GetJobUpdateDiffResult{
Add: ptoa.NewConfigGroupWithoutTaskConfig(resp.GetInstancesAdded()),
Update: ptoa.NewConfigGroupWithoutTaskConfig(resp.GetInstancesUpdated()),
Remove: ptoa.NewConfigGroupWithoutTaskConfig(resp.GetInstancesRemoved()),
Unchanged: ptoa.NewConfigGroupWithoutTaskConfig(resp.GetInstancesUnchanged()),
}}, nil
}
// GetTierConfigs is a no-op. It is only used to determine liveness of the scheduler.
func (h *ServiceHandler) GetTierConfigs(
ctx context.Context,
) (*api.Response, error) {
startTime := time.Now()
result := &api.Result{
GetTierConfigResult: &api.GetTierConfigResult{
DefaultTierName: ptr.String(common.Preemptible),
Tiers: []*api.TierConfig{
{
Name: ptr.String(common.Revocable),
Settings: map[string]string{
common.Preemptible: "true",
common.Revocable: "true",
},
},
{
Name: ptr.String(common.Preferred),
Settings: map[string]string{
common.Preemptible: "false",
common.Revocable: "false",
},
},
{
Name: ptr.String(common.Preemptible),
Settings: map[string]string{
common.Preemptible: "true",
common.Revocable: "false",
},
},
},
},
}
resp := newResponse(result, nil)
defer func() {
h.metrics.
Procedures[ProcedureGetTierConfigs].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureGetTierConfigs].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
log.WithFields(log.Fields{
"result": result,
}).Debug("GetTierConfigs success")
}()
return resp, nil
}
// KillTasks initiates a kill on tasks.
func (h *ServiceHandler) KillTasks(
ctx context.Context,
job *api.JobKey,
instances map[int32]struct{},
message *string,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.killTasks(ctx, job, instances, message)
resp := newResponse(result, err, "killTasks")
defer func() {
h.metrics.
Procedures[ProcedureKillTasks].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureKillTasks].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
var instancesArr []string
for instanceID := range instances {
instancesArr = append(instancesArr, string(instanceID))
}
instancesStr := strings.Join(instancesArr, ",")
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"job": job,
"instances": instancesStr,
"message": message,
},
"code": err.responseCode,
"error": err.msg,
}).Error("KillTasks error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"job": job,
"instances": instancesStr,
"message": message,
},
}).Info("KillTasks success")
}()
return resp, nil
}
func (h *ServiceHandler) killTasks(
ctx context.Context,
job *api.JobKey,
instances map[int32]struct{},
message *string,
) (*api.Result, *auroraError) {
id, err := h.getJobID(ctx, job)
if err != nil {
return nil, auroraErrorf("get job id: %s", err)
}
summary, err := h.getJobInfoSummary(ctx, id)
if err != nil {
return nil, auroraErrorf("get job info summary: %s", err)
}
stopAll := false
if uint32(len(instances)) == summary.GetInstanceCount() {
// Sanity check to make sure we don't stop everything if instances
// are out of bounds.
low, high := instanceBounds(instances)
if low == 0 && high == int32(len(instances)-1) {
stopAll = true
}
}
if instances == nil || len(instances) == 0 {
// if instances is not passed in, assuming killing all tasks
stopAll = true
}
if stopAll {
// If all instances are specified, issue a single StopJob instead of
// multiple StopPods for performance reasons.
req := &statelesssvc.StopJobRequest{
JobId: id,
Version: summary.GetStatus().GetVersion(),
}
if _, err := h.jobClient.StopJob(ctx, req); err != nil {
return nil, auroraErrorf("stop job: %s", err)
}
} else {
if err := h.stopPodsConcurrently(ctx, id, instances); err != nil {
return nil, auroraErrorf("stop pods in parallel: %s", err)
}
}
return dummyResult(), nil
}
func (h *ServiceHandler) stopPodsConcurrently(
ctx context.Context,
id *peloton.JobID,
instances map[int32]struct{},
) error {
var inputs []interface{}
for i := range instances {
inputs = append(inputs, i)
}
f := func(ctx context.Context, input interface{}) (interface{}, error) {
instanceID := input.(int32)
name := util.CreatePelotonTaskID(id.GetValue(), uint32(instanceID))
req := &podsvc.StopPodRequest{
PodName: &peloton.PodName{Value: name},
}
resp, err := h.podClient.StopPod(ctx, req)
if err != nil {
return nil, fmt.Errorf("stop pod %d: %s", instanceID, err)
}
return resp, nil
}
_, err := concurrency.Map(
ctx,
concurrency.MapperFunc(f),
inputs,
h.config.StopPodWorkers)
return err
}
// instanceBounds returns the lowest and highest instance id of
// instances. If instances is empty, returns -1.
func instanceBounds(instances map[int32]struct{}) (low, high int32) {
if len(instances) == 0 {
return -1, -1
}
for i := range instances {
if i < low {
low = i
}
if i > high {
high = i
}
}
return low, high
}
// StartJobUpdate starts update of the existing service job.
func (h *ServiceHandler) StartJobUpdate(
ctx context.Context,
request *api.JobUpdateRequest,
message *string,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.startJobUpdate(ctx, request, message)
resp := newResponse(result, err, "startJobUpdate")
defer func() {
updateService := request.GetTaskConfig().GetJob().GetRole()
responseCode := resp.GetResponseCode()
responseTime := time.Since(startTime)
if len(updateService) > 0 {
h.metrics.
Procedures[ProcedureStartJobUpdate].
ResponseCodes[responseCode].
Scope.
Tagged(map[string]string{
TagService: updateService,
}).
Counter(MetricNameCalls).
Inc(1)
h.metrics.
Procedures[ProcedureStartJobUpdate].
ResponseCodes[responseCode].
Scope.
Tagged(map[string]string{
TagService: updateService,
}).
Timer(MetricNameCallLatency).
Record(responseTime)
} else {
h.metrics.
Procedures[ProcedureStartJobUpdate].
ResponseCodes[responseCode].
Calls.
Inc(1)
h.metrics.
Procedures[ProcedureStartJobUpdate].
ResponseCodes[responseCode].
CallLatency.
Record(responseTime)
}
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"request": request,
"message": message,
},
"code": err.responseCode,
"error": err.msg,
}).Error("StartJobUpdate error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"message": message,
"request": request, // TODO (varung): remove post PRR or as necessary
},
"update_id": result.GetStartJobUpdateResult().GetKey().GetID(),
"job_update_state": result.GetStartJobUpdateResult().GetUpdateSummary().GetState().String(),
}).Info("StartJobUpdate success")
}()
return resp, nil
}
// PauseJobUpdate pauses the specified job update. Can be resumed by resumeUpdate call.
func (h *ServiceHandler) PauseJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
message *string,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.pauseJobUpdate(ctx, key, message)
resp := newResponse(result, err, "pauseJobUpdate")
defer func() {
h.metrics.
Procedures[ProcedurePauseJobUpdate].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedurePauseJobUpdate].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"key": key,
"message": message,
},
"code": err.responseCode,
"error": err.msg,
}).Error("PauseJobUpdate error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"job": key.GetJob(),
"update_id": key.GetID(),
"message": message,
},
}).Info("PauseJobUpdate success")
}()
return resp, nil
}
func (h *ServiceHandler) pauseJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
message *string,
) (*api.Result, *auroraError) {
id, err := h.getJobID(ctx, key.GetJob())
if err != nil {
return nil, auroraErrorf("get job id: %s", err)
}
v, aerr := h.matchJobUpdateID(ctx, id, key.GetID())
if aerr != nil {
return nil, aerr
}
req := &statelesssvc.PauseJobWorkflowRequest{
JobId: id,
Version: v,
}
if _, err := h.jobClient.PauseJobWorkflow(ctx, req); err != nil {
return nil, auroraErrorf("pause job workflow: %s", err)
}
return dummyResult(), nil
}
// ResumeJobUpdate resumes progress of a previously paused job update.
func (h *ServiceHandler) ResumeJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
message *string,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.resumeJobUpdate(ctx, key, message)
resp := newResponse(result, err, "resumeJobUpdate")
defer func() {
h.metrics.
Procedures[ProcedureResumeJobUpdate].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureResumeJobUpdate].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"key": key,
"message": message,
},
"code": err.responseCode,
"error": err.msg,
}).Error("ResumeJobUpdate error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"job": key.GetJob(),
"update_id": key.GetID(),
"message": message,
},
}).Info("ResumeJobUpdate success")
}()
return resp, nil
}
func (h *ServiceHandler) resumeJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
message *string,
) (*api.Result, *auroraError) {
id, err := h.getJobID(ctx, key.GetJob())
if err != nil {
return nil, auroraErrorf("get job id: %s", err)
}
v, aerr := h.matchJobUpdateID(ctx, id, key.GetID())
if aerr != nil {
return nil, aerr
}
req := &statelesssvc.ResumeJobWorkflowRequest{
JobId: id,
Version: v,
}
if _, err := h.jobClient.ResumeJobWorkflow(ctx, req); err != nil {
return nil, auroraErrorf("resume job workflow: %s", err)
}
return dummyResult(), nil
}
// AbortJobUpdate permanently aborts the job update. Does not remove the update history.
func (h *ServiceHandler) AbortJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
message *string,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.abortJobUpdate(ctx, key, message)
resp := newResponse(result, err, "abortJobUpdate")
defer func() {
h.metrics.
Procedures[ProcedureAbortJobUpdate].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureAbortJobUpdate].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"key": key,
"message": message,
},
"code": err.responseCode,
"error": err.msg,
}).Error("AbortJobUpdate error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"job": key.GetJob(),
"update_id": key.GetID(),
"message": message,
},
}).Info("AbortJobUpdate success")
}()
return resp, nil
}
func (h *ServiceHandler) abortJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
message *string,
) (*api.Result, *auroraError) {
id, err := h.getJobID(ctx, key.GetJob())
if err != nil {
return nil, auroraErrorf("get job id: %s", err)
}
v, aerr := h.matchJobUpdateID(ctx, id, key.GetID())
if aerr != nil {
return nil, aerr
}
req := &statelesssvc.AbortJobWorkflowRequest{
JobId: id,
Version: v,
}
if _, err := h.jobClient.AbortJobWorkflow(ctx, req); err != nil {
return nil, auroraErrorf("abort job workflow: %s", err)
}
return dummyResult(), nil
}
// RollbackJobUpdate rollbacks the specified active job update to the initial state.
func (h *ServiceHandler) RollbackJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
message *string,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.rollbackJobUpdate(ctx, key, message)
resp := newResponse(result, err, "rollbackJobUpdate")
defer func() {
h.metrics.
Procedures[ProcedureRollbackJobUpdate].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedureRollbackJobUpdate].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"key": key,
"message": message,
},
"code": err.responseCode,
"error": err.msg,
}).Error("RollbackJobUpdate error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"job": key.GetJob(),
"update_id": key.GetID(),
"message": message,
},
}).Info("RollbackJobUpdate success")
}()
return resp, nil
}
// _validRollbackStatuses enumerates the statuses which a job update must be in
// for rollback to be valid.
var _validRollbackStatuses = common.NewJobUpdateStatusSet(
api.JobUpdateStatusRollingForward,
api.JobUpdateStatusRollForwardPaused,
api.JobUpdateStatusRollForwardAwaitingPulse,
)
func (h *ServiceHandler) rollbackJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
message *string,
) (*api.Result, *auroraError) {
id, err := h.getJobID(ctx, key.GetJob())
if err != nil {
return nil, auroraErrorf("get job id: %s", err)
}
j, jobSpec, w, err := h.getJobAndWorkflow(ctx, id)
if err != nil {
return nil, auroraErrorf("get job: %s", err)
}
d, err := opaquedata.Deserialize(w.GetOpaqueData())
if err != nil {
return nil, auroraErrorf("deserialize opaque data: %s", err)
}
if d.UpdateID != key.GetID() {
return nil, auroraErrorf(
"update id does not match current update").
code(api.ResponseCodeInvalidRequest)
}
status, err := ptoa.NewJobUpdateStatus(w.GetStatus().GetState(), d)
if err != nil {
return nil, auroraErrorf("new job update status: %s", err)
}
if !_validRollbackStatuses.Has(status) {
return nil, auroraErrorf(
"invalid rollback: update must be in %s", _validRollbackStatuses).
code(api.ResponseCodeInvalidRequest)
}
d.AppendUpdateAction(opaquedata.Rollback)
od, err := d.Serialize()
if err != nil {
return nil, auroraErrorf("serialize opaque data: %s", err)
}
prevVersion := w.GetStatus().GetPrevVersion()
prevConfigVersion, _, _, err := versionutil.ParseJobEntityVersion(prevVersion)
if err != nil {
return nil, auroraErrorf("parse previous job entity version: %s", err)
}
var prevSpec *stateless.JobSpec
if prevConfigVersion == 0 {
// First deployment, no previous version config, use current spec
// so that we can still get a valid spec on read path
prevSpec = proto.Clone(jobSpec).(*stateless.JobSpec)
// Set instance count to 0, so the rollback would bring existing
// instances down
prevSpec.InstanceCount = 0
} else {
prevJob, err := h.getFullJobInfoByVersion(ctx, id, prevVersion)
if err != nil {
return nil, auroraErrorf("get previous job: %s", err)
}
prevSpec = prevJob.GetSpec()
}
updateSpec := w.GetUpdateSpec()
// Never rollback a rollback.
updateSpec.RollbackOnFailure = false
updateSpec.StartPaused = status == api.JobUpdateStatusRollForwardAwaitingPulse
req := &statelesssvc.ReplaceJobRequest{
JobId: id,
Version: j.GetVersion(),
Spec: prevSpec,
//Secrets: nil,
UpdateSpec: updateSpec,
OpaqueData: od,
}
if _, err := h.jobClient.ReplaceJob(ctx, req); err != nil {
return nil, auroraErrorf("replace job: %s", err)
}
return dummyResult(), nil
}
// PulseJobUpdate allows progress of the job update in case blockIfNoPulsesAfterMs is specified in
// JobUpdateSettings. Unblocks progress if the update was previously blocked.
// Responds with ResponseCode.INVALID_REQUEST in case an unknown update key is specified.
func (h *ServiceHandler) PulseJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
) (*api.Response, error) {
startTime := time.Now()
result, err := h.pulseJobUpdate(ctx, key)
resp := newResponse(result, err, "pulseJobUpdate")
defer func() {
h.metrics.
Procedures[ProcedurePulseJobUpdate].
ResponseCodes[resp.GetResponseCode()].
Calls.Inc(1)
h.metrics.
Procedures[ProcedurePulseJobUpdate].
ResponseCodes[resp.GetResponseCode()].
CallLatency.Record(time.Since(startTime))
if err != nil {
log.WithFields(log.Fields{
"params": log.Fields{
"key": key,
},
"code": err.responseCode,
"error": err.msg,
}).Error("PulseJobUpdate error")
return
}
log.WithFields(log.Fields{
"params": log.Fields{
"job": key.GetJob(),
"update_id": key.GetID(),
},
"result": result.GetPulseJobUpdateResult().GetStatus().String(),
}).Info("PulseJobUpdate success")
}()
return resp, nil
}
// _validPulseStatuses enumerates the statuses which a job update must be in
// for pulse to be valid.
var _validPulseStatuses = common.NewJobUpdateStatusSet(
api.JobUpdateStatusRollForwardAwaitingPulse,
api.JobUpdateStatusRollBackAwaitingPulse,
)
func (h *ServiceHandler) pulseJobUpdate(
ctx context.Context,
key *api.JobUpdateKey,
) (*api.Result, *auroraError) {
id, err := h.getJobID(ctx, key.GetJob())
if err != nil {
aerr := auroraErrorf("get job id: %s", err)
if yarpcerrors.IsNotFound(err) {
// Unknown update.
// TODO(codyg): We should support some form of update ID.
aerr.code(api.ResponseCodeInvalidRequest)
}
return nil, aerr
}
j, _, w, err := h.getJobAndWorkflow(ctx, id)
if err != nil {
return nil, auroraErrorf("get job status: %s", err)
}
d, err := opaquedata.Deserialize(w.GetOpaqueData())
if err != nil {
return nil, auroraErrorf("deserialize opaque data: %s", err)
}
if d.UpdateID != key.GetID() {
return nil, auroraErrorf("update id does not match current update").
code(api.ResponseCodeInvalidRequest)
}
status, err := ptoa.NewJobUpdateStatus(w.GetStatus().GetState(), d)
if err != nil {
return nil, auroraErrorf("new job update status: %s", err)
}
// Only resume if we're in a valid status. Else, pulseJobUpdate is
// a no-op.
if _validPulseStatuses.Has(status) {
d.AppendUpdateAction(opaquedata.Pulse)
od, err := d.Serialize()
if err != nil {
return nil, auroraErrorf("serialize opaque data: %s", err)
}
req := &statelesssvc.ResumeJobWorkflowRequest{
JobId: id,
Version: j.GetVersion(),
OpaqueData: od,
}
if _, err := h.jobClient.ResumeJobWorkflow(ctx, req); err != nil {
return nil, auroraErrorf("resume job workflow: %s", err)
}
}
return &api.Result{
PulseJobUpdateResult: &api.PulseJobUpdateResult{
Status: api.JobUpdatePulseStatusOk.Ptr(),
},
}, nil
}
// queryJobUpdates is an awkward helper which returns JobUpdateDetails which
// will include instance events if flag is set.
func (h *ServiceHandler) queryJobUpdates(
ctx context.Context,
query *api.JobUpdateQuery,
includeInstanceEvents bool,
) ([]*api.JobUpdateDetails, error) {
filter := &updateFilter{
id: query.GetKey().GetID(),
statuses: query.GetUpdateStatuses(),
}
jobs, err := h.getJobCacheFromJobUpdateQuery(ctx, query)
if err != nil {
if yarpcerrors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("get job summaries: %s", err)
}
var inputs []interface{}
for _, j := range jobs {
inputs = append(inputs, j)
}
f := func(ctx context.Context, input interface{}) (interface{}, error) {
return h.getFilteredJobUpdateDetails(
ctx, input.(*jobCache), filter, includeInstanceEvents)
}
outputs, err := concurrency.Map(
ctx,
concurrency.MapperFunc(f),
inputs,
h.config.GetJobUpdateWorkers)
if err != nil {
return nil, fmt.Errorf("build job update details: %s", err)
}
var results []*api.JobUpdateDetails
for _, o := range outputs {
for _, d := range o.([]*api.JobUpdateDetails) {
results = append(results, d)
}
}
return results, nil
}
type updateFilter struct {
id string
statuses map[api.JobUpdateStatus]struct{}
}
// include returns true if s is allowed by the filter.
func (f *updateFilter) include(s *api.JobUpdateSummary) bool {
if f.id != "" && f.id != s.GetKey().GetID() {
return false
}
if len(f.statuses) > 0 {
if _, ok := f.statuses[s.GetState().GetStatus()]; !ok {
return false
}
}
return true
}
// getFilteredJobUpdateDetails fetches updates for job and prunes them according to
// the filter.
func (h *ServiceHandler) getFilteredJobUpdateDetails(
ctx context.Context,
job *jobCache,
filter *updateFilter,
includeInstanceEvents bool,
) ([]*api.JobUpdateDetails, error) {
k, err := ptoa.NewJobKey(job.Name)
if err != nil {
return nil, fmt.Errorf("new job key: %s", err)
}
workflows, err := h.listWorkflows(ctx, job.JobId, includeInstanceEvents)
if err != nil {
if yarpcerrors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("list workflows: %s", err)
}
// Sort workflows by descending time order
sort.Stable(sort.Reverse(ptoa.WorkflowsByMaxTS(workflows)))
// Filter out any non-update workflows (i.e. restart), since they are not
// valid updates generated by udeploy, and do not contain valid opaque data
workflows = filterWorkflows(workflows, stateless.WorkflowType_WORKFLOW_TYPE_UPDATE)
// Group updates by update id.
detailsByID := make(map[string][]*api.JobUpdateDetails)
var idOrder []string
for i, w := range workflows {
j := i + 1
var prevWorkflow *stateless.WorkflowInfo
if j < len(workflows) {
prevWorkflow = workflows[j]
}
// TODO(kevinxu): skip no-op update to match aurora's behavior.
// NOTE: If the update is no-op, aurora's behavior is to skip creating
// the update.
//if len(w.GetInstancesAdded())+len(w.GetInstancesUpdated())+
// len(w.GetInstancesRemoved()) == 0 {
// continue
//}
d, err := ptoa.NewJobUpdateDetails(k, prevWorkflow, w)
if err != nil {
return nil, fmt.Errorf("new job update details: %s", err)
}
id := d.GetUpdate().GetSummary().GetKey().GetID()
detailsByID[id] = append(detailsByID[id], d)
idOrder = append(idOrder, id)
}
var results []*api.JobUpdateDetails
for _, id := range idOrder {
details, ok := detailsByID[id]
if !ok {
continue
}
delete(detailsByID, id)
var d *api.JobUpdateDetails
switch len(details) {
case 1:
// NOTE: It is possible that this single update is actually a
// rollback whose original update has already been pruned from
// Peloton. This is *probably* fine, since the consumer of this API
// usually just cares if an update has been rolled back.
d = details[0]
case 2:
// If two updates have the same id, it means one was an update and
// the other was a rollback.
d = ptoa.JoinRollbackJobUpdateDetails(details[0], details[1])
default:
// Nothing to do here but make noise and ignore the update.
log.WithFields(log.Fields{
"job_id": job.JobId,
"details": details,
}).Error("Invariant violation: expected exactly 1 or 2 updates with same update id")
continue
}
if filter.include(d.GetUpdate().GetSummary()) {
results = append(results, d)
}
}
return results, nil
}
// getJobSummariesFromJobUpdateQuery queries peloton jobs based on
// Aurora's JobUpdateQuery.
//
// Returns a yarpc NOT_FOUND error if no jobs match the query.
func (h *ServiceHandler) getJobCacheFromJobUpdateQuery(
ctx context.Context,
q *api.JobUpdateQuery,
) ([]*jobCache, error) {
if q.IsSetKey() {
return h.getJobCacheFromJobKey(ctx, q.GetKey().GetJob())
}
if q.IsSetJobKey() {
return h.getJobCacheFromJobKey(ctx, q.GetJobKey())
}
jobmgrJobCaches, err := h.queryJobCache(ctx, q.GetRole(), "", "")
if err != nil {
return nil, err
}
var jobCaches []*jobCache
for _, c := range jobmgrJobCaches {
jobCaches = append(jobCaches, &jobCache{
JobId: c.JobId,
Name: c.Name,
})
}
return jobCaches, nil
}
// getJobID maps k to a job id.
//
// Note: Since we do not delete job name to id mapping when a job
// is deleted, we cannot rely on not-found error to determine the
// existence of a job.
//
// TODO: To be deprecated in favor of getJobCacheFromJobKey.
// Aggregator expects job key environment to be set in response of
// GetJobUpdateDetails to filter by deployment_id. On filtering via job key
// role, original peloton job name is not known to set job key environment.
func (h *ServiceHandler) getJobID(
ctx context.Context,
k *api.JobKey,
) (*peloton.JobID, error) {
req := &statelesssvc.GetJobIDFromJobNameRequest{
JobName: atop.NewJobName(k),
}
resp, err := h.jobClient.GetJobIDFromJobName(ctx, req)
if err != nil {
return nil, err
}
// results are sorted chronologically, return the latest one
return resp.GetJobId()[0], nil
}
// queryJobIDs takes optional job key components and returns the Peloton job ids
// which match the set parameters. E.g. queryJobIDs("myservice", "", "") will return
// job ids which match role=myservice.
func (h *ServiceHandler) queryJobIDs(
ctx context.Context,
role, env, name string,
) ([]*peloton.JobID, error) {
if role != "" && env != "" && name != "" {
// All job key components set, just use a job key query directly.
id, err := h.getJobID(ctx, &api.JobKey{
Role: ptr.String(role),
Environment: ptr.String(env),
Name: ptr.String(name),
})
if err != nil {
return nil, err
}
return []*peloton.JobID{id}, nil
}
jobCache, err := h.queryJobCache(ctx, role, env, name)
if err != nil {
return nil, err
}
jobIDs := make([]*peloton.JobID, 0, len(jobCache))
for _, cache := range jobCache {
jobIDs = append(jobIDs, cache.GetJobId())
}
return jobIDs, nil
}
// getJobCacheFromJobKey returns jobCache struct based on Aurora JobKey
// passed in.
func (h *ServiceHandler) getJobCacheFromJobKey(
ctx context.Context,
k *api.JobKey,
) ([]*jobCache, error) {
req := &statelesssvc.GetJobIDFromJobNameRequest{
JobName: atop.NewJobName(k),
}
resp, err := h.jobClient.GetJobIDFromJobName(ctx, req)
if err != nil {
return nil, err
}
// results are sorted chronologically, return the latest one
return []*jobCache{
{
JobId: resp.GetJobId()[0],
Name: atop.NewJobName(k),
},
}, nil
}
// queryJobCache calls jobmgr's private QueryJobCache API, passes the querying
// labels for role, env and name parameters, and returns a list of JobCache
// objects.
func (h *ServiceHandler) queryJobCache(
ctx context.Context,
role, env, name string,
) ([]*jobmgrsvc.QueryJobCacheResponse_JobCache, error) {
labels := append(
label.BuildPartialAuroraJobKeyLabels(role, env, name),
common.BridgeJobLabel,
)
req := &jobmgrsvc.QueryJobCacheRequest{
Spec: &jobmgrsvc.QueryJobCacheRequest_CacheQuerySpec{
Labels: labels,
},
}
resp, err := h.jobmgrClient.QueryJobCache(ctx, req)
if err != nil {
return nil, err
}
return resp.GetResult(), nil
}
// getJobIDsFromTaskQuery queries peloton job ids based on aurora TaskQuery.
// Note that it will not throw error when no job is found. The current
// behavior for querying:
// 1. If TaskQuery.JobKeys is present, the job keys there to query job ids
// 2. Otherwise use TaskQuery.Role, TaskQuery.Environment and
// TaskQuery.JobName to construct a job key (those 3 fields may not be
// all present), and use it to query job ids.
//
// Note: Due to getJobID() may return invalid job ids, e.g. job ids that
// already deleted, be sure to check whether the error is "not-found" after
// querying using the job id.
func (h *ServiceHandler) getJobIDsFromTaskQuery(
ctx context.Context,
query *api.TaskQuery,
) ([]*peloton.JobID, error) {
if query == nil {
return nil, errors.New("task query is nil")
}
// use job_keys to query if present
if query.IsSetJobKeys() {
var ids []*peloton.JobID
for _, jobKey := range query.GetJobKeys() {
id, err := h.getJobID(ctx, jobKey)
if err != nil {
if yarpcerrors.IsNotFound(err) {
continue
}
return nil, errors.Wrapf(err, "get job id for %q", jobKey)
}
ids = append(ids, id)
}
return ids, nil
}
ids, err := h.queryJobIDs(
ctx, query.GetRole(), query.GetEnvironment(), query.GetJobName())
if err != nil {
if yarpcerrors.IsNotFound(err) {
// ignore not found error and return empty job ids
return nil, nil
}
return nil, errors.Wrapf(err, "get job ids")
}
return ids, nil
}
// getJobIDsFromRoleCache queries peloton job ids based on aurora JobKey role.
// It will first look at the internal job id cache first,
func (h *ServiceHandler) getJobIDsFromRoleCache(
ctx context.Context,
role string,
) ([]*peloton.JobID, error) {
if ids := h.jobIdCache.GetJobIDs(role); len(ids) > 0 {
return ids, nil
}
jobCache, err := h.queryJobCache(ctx, role, "", "")
if err != nil {
return nil, err
}
h.jobIdCache.PopulateFromJobCache(role, jobCache)
jobIDs := make([]*peloton.JobID, 0, len(jobCache))
for _, cache := range jobCache {
jobIDs = append(jobIDs, cache.GetJobId())
}
return jobIDs, nil
}
// matchJobUpdateID matches a jobID workflow against updateID. Returns the entity
// version the workflow is moving towards. If the current workflow does not
// match updateID, returns an INVALID_REQUEST Aurora error.
func (h *ServiceHandler) matchJobUpdateID(
ctx context.Context,
jobID *peloton.JobID,
updateID string,
) (*peloton.EntityVersion, *auroraError) {
j, _, w, err := h.getJobAndWorkflow(ctx, jobID)
if err != nil {
return nil, auroraErrorf("get job status: %s", err)
}
d, err := opaquedata.Deserialize(w.GetOpaqueData())
if err != nil {
return nil, auroraErrorf("deserialize opaque data: %s", err)
}
if d.UpdateID != updateID {
return nil, auroraErrorf("update id does not match current update").
code(api.ResponseCodeInvalidRequest)
}
return j.GetVersion(), nil
}
// getJobInfo calls jobmgr to get JobInfo based on JobID.
func (h *ServiceHandler) getJobInfo(
ctx context.Context,
jobID *peloton.JobID,
) (*stateless.JobInfo, error) {
req := &statelesssvc.GetJobRequest{
JobId: jobID,
SummaryOnly: false,
}
resp, err := h.jobClient.GetJob(ctx, req)
if err != nil {
return nil, err
}
return resp.GetJobInfo(), nil
}
func (h *ServiceHandler) getFullJobInfoByVersion(
ctx context.Context,
jobID *peloton.JobID,
v *peloton.EntityVersion,
) (*stateless.JobInfo, error) {
req := &statelesssvc.GetJobRequest{
JobId: jobID,
Version: v,
}
resp, err := h.jobClient.GetJob(ctx, req)
if err != nil {
return nil, err
}
return resp.GetJobInfo(), nil
}
// getJobInfoSummary calls jobmgr to get JobSummary based on JobID.
func (h *ServiceHandler) getJobInfoSummary(
ctx context.Context,
jobID *peloton.JobID,
) (*stateless.JobSummary, error) {
req := &statelesssvc.GetJobRequest{
JobId: jobID,
SummaryOnly: true,
}
resp, err := h.jobClient.GetJob(ctx, req)
if err != nil {
return nil, err
}
return resp.GetSummary(), nil
}
func (h *ServiceHandler) getJobAndWorkflow(
ctx context.Context,
id *peloton.JobID,
) (*stateless.JobStatus, *stateless.JobSpec, *stateless.WorkflowInfo, error) {
resp, err := h.jobClient.GetJob(ctx, &statelesssvc.GetJobRequest{JobId: id})
if err != nil {
return nil, nil, nil, err
}
return resp.GetJobInfo().GetStatus(), resp.GetJobInfo().GetSpec(), resp.GetWorkflowInfo(), nil
}
// queryPods calls jobmgr to query a list of PodInfo based on input JobID.
func (h *ServiceHandler) queryPods(
ctx context.Context,
jobID *peloton.JobID,
instanceCount uint32,
) ([]*pod.PodInfo, error) {
var inputs []interface{}
for i := uint32(0); i < instanceCount; i++ {
inputs = append(inputs, fmt.Sprintf("%s-%d", jobID.GetValue(), i))
}
workers := h.config.getTasksWithoutConfigsWorkers(len(inputs))
f := func(ctx context.Context, input interface{}) (interface{}, error) {
podName := input.(string)
req := &podsvc.GetPodRequest{
PodName: &peloton.PodName{
Value: podName,
},
StatusOnly: false,
Limit: 1,
}
resp, err := h.podClient.GetPod(ctx, req)
if err != nil {
// If TaskRuntime does not exist, return nil PodInfo
if yarpcerrors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
return resp.GetCurrent(), nil
}
outputs, err := concurrency.Map(
ctx,
concurrency.MapperFunc(f),
inputs,
workers)
if err != nil {
return nil, err
}
podInfos := []*pod.PodInfo{}
for _, o := range outputs {
if o == nil {
continue
}
podInfo := o.(*pod.PodInfo)
if podInfo == nil {
continue
}
podInfos = append(podInfos, podInfo)
}
return podInfos, nil
}
// getPodEvents calls jobmgr to get a list of PodEvent based on PodName.
func (h *ServiceHandler) getPodEvents(
ctx context.Context,
podName *peloton.PodName,
podID *peloton.PodID,
) ([]*pod.PodEvent, error) {
req := &podsvc.GetPodEventsRequest{
PodName: podName,
PodId: podID,
}
resp, err := h.podClient.GetPodEvents(ctx, req)
if err != nil {
return nil, err
}
return resp.GetEvents(), nil
}
// listWorkflows lists recent workflows for jobID.
//
// Returns yarpc NOT_FOUND error if no workflows found.
func (h *ServiceHandler) listWorkflows(
ctx context.Context,
jobID *peloton.JobID,
includeInstanceEvents bool,
) ([]*stateless.WorkflowInfo, error) {
req := &statelesssvc.ListJobWorkflowsRequest{
JobId: jobID,
InstanceEvents: includeInstanceEvents,
UpdatesLimit: h.config.UpdatesLimit,
InstanceEventsLimit: h.config.InstanceEventsLimit,
}
resp, err := h.jobClient.ListJobWorkflows(ctx, req)
if err != nil {
return nil, err
}
return resp.GetWorkflowInfos(), nil
}
func max(a, b int32) int32 {
if a > b {
return a
}
return b
}
// filterWorkflows returns a new slice of WorkflowInfo filtered by WorkflowType
func filterWorkflows(
ws []*stateless.WorkflowInfo,
wt stateless.WorkflowType,
) []*stateless.WorkflowInfo {
wsf := make([]*stateless.WorkflowInfo, 0)
for _, w := range ws {
if w.GetStatus().GetType() != wt {
continue
}
wsf = append(wsf, w)
}
return wsf
}
// dummyResult returns a dummy result since YARPC won't allow
// an empty union
func dummyResult() *api.Result {
return &api.Result{
GetTierConfigResult: &api.GetTierConfigResult{},
}
}
// Converts job info to job summary
func convertJobInfoToJobSummary(
jobInfo *stateless.JobInfo,
) *stateless.JobSummary {
return &stateless.JobSummary{
JobId: jobInfo.GetJobId(),
Name: jobInfo.GetSpec().GetName(),
Owner: jobInfo.GetSpec().GetOwner(),
InstanceCount: jobInfo.GetSpec().GetInstanceCount(),
Sla: jobInfo.GetSpec().GetSla(),
Labels: jobInfo.GetSpec().GetLabels(),
RespoolId: jobInfo.GetSpec().GetRespoolId(),
Status: jobInfo.GetStatus(),
}
}