pkg/jobmgr/podsvc/handler.go (805 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package podsvc
import (
"context"
"time"
mesos "github.com/uber/peloton/.gen/mesos/v1"
pbjob "github.com/uber/peloton/.gen/peloton/api/v0/job"
v0peloton "github.com/uber/peloton/.gen/peloton/api/v0/peloton"
pbtask "github.com/uber/peloton/.gen/peloton/api/v0/task"
v1alphapeloton "github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton"
pbpod "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod"
"github.com/uber/peloton/.gen/peloton/api/v1alpha/pod/svc"
"github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc"
"github.com/uber/peloton/pkg/common/api"
"github.com/uber/peloton/pkg/common/leader"
"github.com/uber/peloton/pkg/common/util"
versionutil "github.com/uber/peloton/pkg/common/util/entityversion"
yarpcutil "github.com/uber/peloton/pkg/common/util/yarpc"
"github.com/uber/peloton/pkg/jobmgr/cached"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
"github.com/uber/peloton/pkg/jobmgr/goalstate"
"github.com/uber/peloton/pkg/jobmgr/logmanager"
jobmgrtask "github.com/uber/peloton/pkg/jobmgr/task"
goalstateutil "github.com/uber/peloton/pkg/jobmgr/util/goalstate"
taskutil "github.com/uber/peloton/pkg/jobmgr/util/task"
"github.com/uber/peloton/pkg/storage"
ormobjects "github.com/uber/peloton/pkg/storage/objects"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc"
"go.uber.org/yarpc/yarpcerrors"
)
const (
_frameworkName = "Peloton"
)
var _errPodNotInCache = yarpcerrors.InternalErrorf("pod not present in cache, please retry action")
type serviceHandler struct {
jobStore storage.JobStore
podStore storage.TaskStore
frameworkInfoStore storage.FrameworkInfoStore
podEventsOps ormobjects.PodEventsOps
taskConfigV2Ops ormobjects.TaskConfigV2Ops
jobFactory cached.JobFactory
goalStateDriver goalstate.Driver
candidate leader.Candidate
logManager logmanager.LogManager
mesosAgentWorkDir string
hostMgrClient hostsvc.InternalHostServiceYARPCClient
}
// InitV1AlphaPodServiceHandler initializes the Pod Service Handler
func InitV1AlphaPodServiceHandler(
d *yarpc.Dispatcher,
jobStore storage.JobStore,
podStore storage.TaskStore,
frameworkInfoStore storage.FrameworkInfoStore,
ormStore *ormobjects.Store,
jobFactory cached.JobFactory,
goalStateDriver goalstate.Driver,
candidate leader.Candidate,
logManager logmanager.LogManager,
mesosAgentWorkDir string,
hostMgrClient hostsvc.InternalHostServiceYARPCClient,
) {
handler := &serviceHandler{
jobStore: jobStore,
podStore: podStore,
frameworkInfoStore: frameworkInfoStore,
podEventsOps: ormobjects.NewPodEventsOps(ormStore),
taskConfigV2Ops: ormobjects.NewTaskConfigV2Ops(ormStore),
jobFactory: jobFactory,
goalStateDriver: goalStateDriver,
candidate: candidate,
logManager: logManager,
mesosAgentWorkDir: mesosAgentWorkDir,
hostMgrClient: hostMgrClient,
}
d.Register(svc.BuildPodServiceYARPCProcedures(handler))
}
func (h *serviceHandler) StartPod(
ctx context.Context,
req *svc.StartPodRequest,
) (resp *svc.StartPodResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("PodSVC.StartPod failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Info("PodSVC.StartPod succeeded")
}()
if !h.candidate.IsLeader() {
return nil,
yarpcerrors.UnavailableErrorf("PodSVC.StartPod is not supported on non-leader")
}
jobID, instanceID, err := util.ParseTaskID(req.GetPodName().GetValue())
if err != nil {
return nil, err
}
cachedJob := h.jobFactory.AddJob(&v0peloton.JobID{Value: jobID})
cachedConfig, err := cachedJob.GetConfig(ctx)
if err != nil {
return nil, errors.Wrap(err, "fail to get job config")
}
// change the state of job first
if err := h.startJob(ctx, cachedJob, cachedConfig); err != nil {
// enqueue job state to goal state engine and let goal state engine
// decide if the job state needs to be changed
goalstate.EnqueueJobWithDefaultDelay(
&v0peloton.JobID{Value: jobID}, h.goalStateDriver, cachedJob)
return nil, err
}
// then change task state
cachedTask, err := cachedJob.AddTask(ctx, instanceID)
if err != nil {
return nil, errors.Wrap(err, "fail to add pod in job cache")
}
err = h.startPod(ctx, cachedJob, cachedTask, cachedConfig.GetType())
// enqueue the pod/job into goal state engine even in failure case.
// Because the state may be updated, let goal state engine decide what to do
h.goalStateDriver.EnqueueTask(&v0peloton.JobID{Value: jobID}, instanceID, time.Now())
goalstate.EnqueueJobWithDefaultDelay(
&v0peloton.JobID{Value: jobID}, h.goalStateDriver, cachedJob)
if err != nil {
return nil, err
}
return &svc.StartPodResponse{}, nil
}
// startJob sets the job state to PENDING, and set the goal state to
// RUNNING/SUCCEEDED based on job config
func (h *serviceHandler) startJob(
ctx context.Context,
cachedJob cached.Job,
cachedConfig jobmgrcommon.JobConfig,
) error {
count := 0
for {
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return errors.Wrap(err, "fail to fetch job runtime")
}
// batch jobs in terminated state cannot be restarted
if cachedConfig.GetType() == pbjob.JobType_BATCH &&
util.IsPelotonJobStateTerminal(jobRuntime.GetState()) {
return yarpcerrors.InvalidArgumentErrorf("cannot start pod in terminated job")
}
// job already in expected state, skip the runtime update
if jobRuntime.State == pbjob.JobState_PENDING &&
jobRuntime.GoalState == goalstateutil.GetDefaultJobGoalState(cachedConfig.GetType()) {
return nil
}
jobRuntime.State = pbjob.JobState_PENDING
jobRuntime.GoalState = goalstateutil.GetDefaultJobGoalState(cachedConfig.GetType())
// update the job runtime
if _, err = cachedJob.CompareAndSetRuntime(ctx, jobRuntime); err == nil {
return nil
}
if err == jobmgrcommon.UnexpectedVersionError {
// concurrency error; retry MaxConcurrencyErrorRetry times
count = count + 1
if count < jobmgrcommon.MaxConcurrencyErrorRetry {
continue
}
}
return errors.Wrap(err, "fail to update job runtime")
}
}
func (h *serviceHandler) startPod(
ctx context.Context,
cachedJob cached.Job,
cachedTask cached.Task,
jobType pbjob.JobType,
) error {
count := 0
for {
taskRuntime, err := cachedTask.GetRuntime(ctx)
if err != nil {
return errors.Wrap(err, "fail to get pod runtime")
}
// for pod that goal state is running, ignore the kill request
if taskRuntime.GetGoalState() == jobmgrtask.GetDefaultTaskGoalState(jobType) {
return nil
}
if taskRuntime.GetGoalState() == pbtask.TaskState_DELETED {
return yarpcerrors.InvalidArgumentErrorf(
"cannot start a pod going to be deleted")
}
taskConfig, _, err := h.taskConfigV2Ops.GetTaskConfig(
ctx,
cachedJob.ID(),
cachedTask.ID(),
taskRuntime.GetConfigVersion(),
)
if err != nil {
return errors.Wrap(err, "fail to get pod config")
}
healthState := taskutil.GetInitialHealthState(taskConfig)
taskutil.RegenerateMesosTaskRuntime(
cachedJob.ID(),
cachedTask.ID(),
taskRuntime,
healthState,
)
taskRuntime.GoalState =
jobmgrtask.GetDefaultTaskGoalState(jobType)
taskRuntime.Message = "PodSVC.StartPod request"
if _, err = cachedJob.CompareAndSetTask(
ctx,
cachedTask.ID(),
taskRuntime,
false,
); err == nil {
return nil
}
if err == jobmgrcommon.UnexpectedVersionError {
count = count + 1
if count < jobmgrcommon.MaxConcurrencyErrorRetry {
continue
}
}
return errors.Wrap(err, "fail to update pod runtime")
}
}
func (h *serviceHandler) StopPod(
ctx context.Context,
req *svc.StopPodRequest,
) (resp *svc.StopPodResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("PodSVC.StopPod failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Info("PodSVC.StopPod succeeded")
}()
if !h.candidate.IsLeader() {
return nil,
yarpcerrors.UnavailableErrorf("PodSVC.StopPod is not supported on non-leader")
}
jobID, instanceID, err := util.ParseTaskID(req.GetPodName().GetValue())
if err != nil {
return nil, err
}
cachedJob := h.jobFactory.AddJob(&v0peloton.JobID{Value: jobID})
runtimeInfo, err := h.podStore.GetTaskRuntime(
ctx, cachedJob.ID(), instanceID)
if err != nil {
return nil, err
}
if runtimeInfo.GetGoalState() == pbtask.TaskState_KILLED {
// No-op if the pod is already KILLED
return &svc.StopPodResponse{}, nil
}
runtimeDiff := make(map[uint32]jobmgrcommon.RuntimeDiff)
runtimeDiff[instanceID] = jobmgrcommon.RuntimeDiff{
jobmgrcommon.GoalStateField: pbtask.TaskState_KILLED,
jobmgrcommon.MessageField: "Task stop API request",
jobmgrcommon.ReasonField: "",
jobmgrcommon.TerminationStatusField: &pbtask.TerminationStatus{
Reason: pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_ON_REQUEST,
},
jobmgrcommon.DesiredHostField: "",
}
_, instancesToRetry, err := cachedJob.PatchTasks(ctx, runtimeDiff, false)
// We should enqueue the tasks even if PatchTasks fail,
// because some tasks may get updated successfully in db.
// We can let goal state engine to decide whether or not to stop.
h.goalStateDriver.EnqueueTask(
&v0peloton.JobID{Value: jobID},
instanceID,
time.Now(),
)
if err == nil && len(instancesToRetry) != 0 {
return nil, _errPodNotInCache
}
return &svc.StopPodResponse{}, err
}
func (h *serviceHandler) RestartPod(
ctx context.Context,
req *svc.RestartPodRequest,
) (resp *svc.RestartPodResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("PodSVC.RestartPod failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Info("PodSVC.RestartPod succeeded")
}()
if !h.candidate.IsLeader() {
return nil,
yarpcerrors.UnavailableErrorf("PodSVC.RestartPod is not supported on non-leader")
}
jobID, instanceID, err := util.ParseTaskID(req.GetPodName().GetValue())
if err != nil {
return nil, yarpcerrors.InvalidArgumentErrorf("invalid pod name")
}
cachedJob := h.jobFactory.AddJob(&v0peloton.JobID{Value: jobID})
newPodID, err := h.getPodIDForRestart(ctx,
cachedJob,
instanceID)
if err != nil {
return nil, err
}
runtimeDiff := make(map[uint32]jobmgrcommon.RuntimeDiff)
runtimeDiff[instanceID] = jobmgrcommon.RuntimeDiff{
jobmgrcommon.DesiredMesosTaskIDField: newPodID,
}
if req.GetCheckSla() {
runtimeDiff[instanceID][jobmgrcommon.TerminationStatusField] = &pbtask.TerminationStatus{
Reason: pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_SLA_AWARE_RESTART,
}
} else {
runtimeDiff[instanceID][jobmgrcommon.TerminationStatusField] = &pbtask.TerminationStatus{
Reason: pbtask.TerminationStatus_TERMINATION_STATUS_REASON_KILLED_FOR_RESTART,
}
}
instancesSucceeded, instancesToRetry, err := cachedJob.PatchTasks(ctx, runtimeDiff, false)
// We should enqueue the tasks even if PatchTasks fail,
// because some tasks may get updated successfully in db.
// We can let goal state engine to decide whether or not to restart.
h.goalStateDriver.EnqueueTask(
&v0peloton.JobID{Value: jobID},
instanceID,
time.Now(),
)
if err == nil && len(instancesToRetry) != 0 {
return nil, _errPodNotInCache
}
// the restart would violate SLA
if req.GetCheckSla() && len(instancesSucceeded) == 0 {
return nil, yarpcerrors.AbortedErrorf("pod restart would violate SLA")
}
return &svc.RestartPodResponse{}, err
}
func (h *serviceHandler) GetPod(
ctx context.Context,
req *svc.GetPodRequest,
) (resp *svc.GetPodResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("PodSVC.GetPod failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Debug("PodSVC.GetPod succeeded")
}()
jobID, instanceID, err := util.ParseTaskID(req.GetPodName().GetValue())
if err != nil {
return nil, err
}
pelotonJobID := &v0peloton.JobID{Value: jobID}
taskRuntime, err := h.podStore.GetTaskRuntime(ctx, pelotonJobID, instanceID)
if err != nil {
return nil, errors.Wrap(err, "failed to get task runtime")
}
podStatus := api.ConvertTaskRuntimeToPodStatus(taskRuntime)
var podSpec *pbpod.PodSpec
if !req.GetStatusOnly() {
taskConfig, _, err := h.taskConfigV2Ops.GetTaskConfig(
ctx,
pelotonJobID,
instanceID,
taskRuntime.GetConfigVersion(),
)
if err != nil {
return nil, errors.Wrap(err, "failed to get task config")
}
podSpec = api.ConvertTaskConfigToPodSpec(
taskConfig,
jobID,
instanceID,
)
}
currentPodInfo := &pbpod.PodInfo{
Spec: podSpec,
Status: podStatus,
}
var prevPodInfos []*pbpod.PodInfo
if req.GetLimit() != 1 {
prevPodInfos, err = h.getPodInfoForAllPodRuns(
ctx,
jobID,
instanceID,
podStatus.GetPrevPodId(),
req.GetStatusOnly(),
req.GetLimit(),
)
if err != nil {
return nil, errors.Wrap(err, "failed to get pod info for previous runs")
}
}
return &svc.GetPodResponse{
Current: currentPodInfo,
Previous: prevPodInfos,
}, nil
}
func (h *serviceHandler) GetPodEvents(
ctx context.Context,
req *svc.GetPodEventsRequest,
) (resp *svc.GetPodEventsResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("PodSVC.GetPodEvents failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Debug("PodSVC.GetPodEvents succeeded")
}()
jobID, instanceID, err := util.ParseTaskID(req.GetPodName().GetValue())
if err != nil {
return nil, err
}
podEvents, err := h.podStore.GetPodEvents(
ctx,
jobID,
instanceID,
req.GetPodId().GetValue())
if err != nil {
return nil, errors.Wrap(err, "failed to get pod events from store")
}
return &svc.GetPodEventsResponse{
Events: podEvents,
}, nil
}
func (h *serviceHandler) BrowsePodSandbox(
ctx context.Context,
req *svc.BrowsePodSandboxRequest,
) (resp *svc.BrowsePodSandboxResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("PodSVC.BrowsePodSandbox failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Debug("PodSVC.BrowsePodSandbox succeeded")
}()
jobID, instanceID, err := util.ParseTaskID(req.GetPodName().GetValue())
if err != nil {
return nil, err
}
hostname, agentID, podID, frameworkID, err :=
h.getSandboxPathInfo(
ctx,
jobID,
instanceID,
req.GetPodId().GetValue(),
)
if err != nil {
return nil, err
}
// Extract the IP address + port of the agent, if possible,
// because the hostname may not be resolvable on the network
agentIP := hostname
agentPort := "5051"
agentResponse, err := h.hostMgrClient.GetMesosAgentInfo(ctx,
&hostsvc.GetMesosAgentInfoRequest{Hostname: hostname})
if err == nil && len(agentResponse.Agents) > 0 {
ip, port, err := util.ExtractIPAndPortFromMesosAgentPID(
agentResponse.Agents[0].GetPid())
if err == nil {
agentIP = ip
if port != "" {
agentPort = port
}
}
} else {
log.WithField("hostname", hostname).
Info("Could not get Mesos agent info")
}
var logPaths []string
logPaths, err = h.logManager.ListSandboxFilesPaths(
h.mesosAgentWorkDir,
frameworkID,
agentIP,
agentPort,
agentID,
podID,
)
if err != nil {
return nil, err
}
mesosMasterHostPortResponse, err := h.hostMgrClient.GetMesosMasterHostPort(
ctx,
&hostsvc.MesosMasterHostPortRequest{},
)
if err != nil {
return nil, err
}
resp = &svc.BrowsePodSandboxResponse{
Hostname: agentIP,
Port: agentPort,
Paths: logPaths,
MesosMasterHostname: mesosMasterHostPortResponse.GetHostname(),
MesosMasterPort: mesosMasterHostPortResponse.GetPort(),
}
return resp, nil
}
func (h *serviceHandler) RefreshPod(
ctx context.Context,
req *svc.RefreshPodRequest,
) (resp *svc.RefreshPodResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("PodSVC.RefreshPod failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Info("PodSVC.RefreshPod succeeded")
}()
if !h.candidate.IsLeader() {
return nil,
yarpcerrors.UnavailableErrorf("PodSVC.RefreshPod is not supported on non-leader")
}
jobID, instanceID, err := util.ParseTaskID(req.GetPodName().GetValue())
if err != nil {
return nil, err
}
pelotonJobID := &v0peloton.JobID{Value: jobID}
taskInfo, err := h.podStore.GetTaskForJob(ctx, jobID, instanceID)
if err != nil {
return nil, errors.Wrap(err, "fail to get task info")
}
cachedJob := h.jobFactory.AddJob(pelotonJobID)
if err := cachedJob.ReplaceTasks(taskInfo, true); err != nil {
return nil, errors.Wrap(err, "fail to replace task runtime")
}
h.goalStateDriver.EnqueueTask(pelotonJobID, instanceID, time.Now())
goalstate.EnqueueJobWithDefaultDelay(
pelotonJobID, h.goalStateDriver, cachedJob)
return &svc.RefreshPodResponse{}, nil
}
func (h *serviceHandler) GetPodCache(
ctx context.Context,
req *svc.GetPodCacheRequest,
) (resp *svc.GetPodCacheResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("PodSVC.GetPodCache failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Debug("PodSVC.GetPodCache succeeded")
}()
jobID, instanceID, err := util.ParseTaskID(req.GetPodName().GetValue())
if err != nil {
return nil, err
}
cachedJob := h.jobFactory.GetJob(&v0peloton.JobID{Value: jobID})
if cachedJob == nil {
return nil,
yarpcerrors.NotFoundErrorf("job not found in cache")
}
cachedTask := cachedJob.GetTask(instanceID)
if cachedTask == nil {
return nil,
yarpcerrors.NotFoundErrorf("task not found in cache")
}
runtime, err := cachedTask.GetRuntime(ctx)
if err != nil {
return nil,
errors.Wrap(err, "fail to get task runtime")
}
labels, err := cachedTask.GetLabels(ctx)
if err != nil {
return nil,
errors.Wrap(err, "fail to get task labels")
}
return &svc.GetPodCacheResponse{
Status: api.ConvertTaskRuntimeToPodStatus(runtime),
Labels: api.ConvertLabels(labels),
}, nil
}
func (h *serviceHandler) DeletePodEvents(
ctx context.Context,
req *svc.DeletePodEventsRequest,
) (resp *svc.DeletePodEventsResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("PodSVC.DeletePodEvents failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Info("PodSVC.DeletePodEvents succeeded")
}()
jobID, instanceID, err := util.ParseTaskID(req.GetPodName().GetValue())
if err != nil {
return nil, err
}
runID, err := util.ParseRunID(req.GetPodId().GetValue())
if err != nil {
return nil, err
}
if err = h.podStore.DeletePodEvents(
ctx,
jobID,
instanceID,
runID,
runID+1,
); err != nil {
return nil, err
}
return &svc.DeletePodEventsResponse{}, nil
}
func (h *serviceHandler) getHostInfo(
ctx context.Context,
jobID string,
instanceID uint32,
podID string,
) (hostname, podid, agentID string, err error) {
podEvents, err := h.podStore.GetPodEvents(ctx, jobID, instanceID, podID)
if err != nil {
return "", "", "", errors.Wrap(err, "failed to get pod events")
}
hostname = ""
agentID = ""
for _, event := range podEvents {
podid = event.GetPodId().GetValue()
if event.GetActualState() == jobmgrtask.GetDefaultPodGoalState(pbjob.JobType_SERVICE).String() {
hostname = event.GetHostname()
agentID = event.GetAgentId()
break
}
}
return hostname, podid, agentID, nil
}
// getSandboxPathInfo - return details such as hostname, agentID,
// frameworkID and podName to create sandbox path.
func (h *serviceHandler) getSandboxPathInfo(ctx context.Context,
jobID string,
instanceID uint32,
podID string,
) (hostname, agentID, podid, frameworkID string, err error) {
hostname, podid, agentID, err = h.getHostInfo(
ctx,
jobID,
instanceID,
podID,
)
if err != nil {
return "", "", "", "", err
}
if len(hostname) == 0 || len(agentID) == 0 {
return "", "", "", "", yarpcerrors.AbortedErrorf("pod has not been run")
}
// get framework ID.
frameworkid, err := h.getFrameworkID(ctx)
if err != nil {
return "", "", "", "", err
}
return hostname, agentID, podid, frameworkid, nil
}
// GetFrameworkID returns the frameworkID.
func (h *serviceHandler) getFrameworkID(ctx context.Context) (string, error) {
frameworkIDVal, err := h.frameworkInfoStore.GetFrameworkID(ctx, _frameworkName)
if err != nil {
return frameworkIDVal, err
}
if frameworkIDVal == "" {
return frameworkIDVal, yarpcerrors.InternalErrorf("framework id is empty")
}
return frameworkIDVal, nil
}
// getPodIDForRestart returns the new pod id for restart
func (h *serviceHandler) getPodIDForRestart(
ctx context.Context,
cachedJob cached.Job,
instanceID uint32) (*mesos.TaskID, error) {
runtimeInfo, err := h.podStore.GetTaskRuntime(
ctx, cachedJob.ID(), instanceID)
if err != nil {
return nil, err
}
runID, err :=
util.ParseRunID(runtimeInfo.GetMesosTaskId().GetValue())
if err != nil {
runID = 0
}
return util.CreateMesosTaskID(
cachedJob.ID(), instanceID, runID+1), nil
}
func (h *serviceHandler) getPodInfoForAllPodRuns(
ctx context.Context,
jobID string,
instanceID uint32,
podID *v1alphapeloton.PodID,
statusOnly bool,
limit uint32,
) ([]*pbpod.PodInfo, error) {
var podInfos []*pbpod.PodInfo
pID := podID.GetValue()
for {
podEvents, err := h.podStore.GetPodEvents(ctx, jobID, instanceID, pID)
if err != nil {
return nil, err
}
if len(podEvents) == 0 {
break
}
if limit > 0 && uint32(len(podInfos)) >= (limit-1) {
break
}
prevPodID := podEvents[0].GetPrevPodId().GetValue()
agentID := podEvents[0].GetAgentId()
podInfo := &pbpod.PodInfo{
Status: &pbpod.PodStatus{
State: pbpod.PodState(
pbpod.PodState_value[podEvents[0].GetActualState()],
),
DesiredState: pbpod.PodState(
pbpod.PodState_value[podEvents[0].GetDesiredState()],
),
PodId: &v1alphapeloton.PodID{
Value: podEvents[0].GetPodId().GetValue(),
},
Host: podEvents[0].GetHostname(),
AgentId: &mesos.AgentID{
Value: &agentID,
},
Version: podEvents[0].GetVersion(),
DesiredVersion: podEvents[0].GetDesiredVersion(),
Message: podEvents[0].GetMessage(),
Reason: podEvents[0].GetReason(),
PrevPodId: &v1alphapeloton.PodID{
Value: prevPodID,
},
DesiredPodId: podEvents[0].GetDesiredPodId(),
},
}
podInfos = append(podInfos, podInfo)
pID = prevPodID
if !statusOnly {
configVersion, err := versionutil.GetConfigVersion(
podInfo.GetStatus().GetVersion(),
)
if err != nil {
return nil,
errors.Wrap(err, "failed to get config version for pod run")
}
taskConfig, _, err := h.taskConfigV2Ops.GetTaskConfig(
ctx,
&v0peloton.JobID{Value: jobID},
instanceID,
configVersion,
)
if err != nil {
// If we aren't able to get pod spec for a particular run,
// then we should just continue and fill up whatever
// we can instead of throwing an error.
if yarpcerrors.IsNotFound(err) {
log.WithFields(
log.Fields{
"job_id": jobID,
"instance_id": instanceID,
}).WithError(err).
Info("failed to get task config")
continue
}
return nil, errors.Wrap(err, "failed to get task config")
}
spec := api.ConvertTaskConfigToPodSpec(
taskConfig,
jobID,
instanceID,
)
spec.PodName = &v1alphapeloton.PodName{
Value: util.CreatePelotonTaskID(jobID, instanceID),
}
podInfo.Spec = spec
}
}
return podInfos, nil
}
// NewTestServiceHandler returns an empty new ServiceHandler ptr for testing.
func NewTestServiceHandler() *serviceHandler {
return &serviceHandler{}
}