pkg/jobmgr/task/placement/placement.go (584 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package placement
import (
"context"
"encoding/base64"
"time"
mesos "github.com/uber/peloton/.gen/mesos/v1"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
pbpod "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod"
"github.com/uber/peloton/.gen/peloton/private/resmgr"
"github.com/uber/peloton/.gen/peloton/private/resmgrsvc"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/api"
"github.com/uber/peloton/pkg/common/lifecycle"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/jobmgr/cached"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
"github.com/uber/peloton/pkg/jobmgr/goalstate"
"github.com/uber/peloton/pkg/jobmgr/task/lifecyclemgr"
taskutil "github.com/uber/peloton/pkg/jobmgr/util/task"
ormobjects "github.com/uber/peloton/pkg/storage/objects"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
"go.uber.org/yarpc"
"go.uber.org/yarpc/yarpcerrors"
)
// Config is placement processor specific config
type Config struct {
// PlacementDequeueLimit is the limit which placement processor get the
// placements
PlacementDequeueLimit int `yaml:"placement_dequeue_limit"`
// GetPlacementsTimeout is the timeout value for placement processor to
// call GetPlacements
GetPlacementsTimeout int `yaml:"get_placements_timeout_ms"`
}
// Processor defines the interface of placement processor
// which dequeues placed tasks and sends them to host manager via lifecyclemgr.
type Processor interface {
// Start starts the placement processor goroutines
Start() error
// Stop stops the placement processor goroutines
Stop() error
}
// processor implements the task placement Processor interface.
type processor struct {
resMgrClient resmgrsvc.ResourceManagerServiceYARPCClient
jobFactory cached.JobFactory
goalStateDriver goalstate.Driver
taskConfigV2Ops ormobjects.TaskConfigV2Ops
secretInfoOps ormobjects.SecretInfoOps
lifeCycle lifecycle.LifeCycle
hmVersion api.Version
lm lifecyclemgr.Manager
config *Config
metrics *Metrics
}
const (
// Time out for the function to time out
_rpcTimeout = 10 * time.Second
// maxRetryCount is the maximum number of times a transient error from the DB will be retried.
// This is a safety mechanism to avoid placement engine getting stuck in
// retrying errors wrongly classified as transient errors.
maxRetryCount = 1000
// Default secret operations cassandra timeout.
_defaultSecretInfoOpsTimeout = 10 * time.Second
)
// InitProcessor initializes placement processor
func InitProcessor(
d *yarpc.Dispatcher,
resMgrClientName string,
jobFactory cached.JobFactory,
goalStateDriver goalstate.Driver,
hmVersion api.Version,
ormStore *ormobjects.Store,
config *Config,
parent tally.Scope,
) Processor {
return &processor{
resMgrClient: resmgrsvc.NewResourceManagerServiceYARPCClient(d.ClientConfig(resMgrClientName)),
jobFactory: jobFactory,
goalStateDriver: goalStateDriver,
lm: lifecyclemgr.New(hmVersion, d, parent),
taskConfigV2Ops: ormobjects.NewTaskConfigV2Ops(ormStore),
secretInfoOps: ormobjects.NewSecretInfoOps(ormStore),
config: config,
metrics: NewMetrics(parent.SubScope("jobmgr").SubScope("task")),
lifeCycle: lifecycle.NewLifeCycle(),
hmVersion: hmVersion,
}
}
// Start starts Processor
func (p *processor) Start() error {
if !p.lifeCycle.Start() {
// already running
log.Warn("placement processor is already running, no action will be performed")
return nil
}
log.Info("starting placement processor")
go p.run()
log.Info("placement processor started")
return nil
}
func (p *processor) run() {
for {
select {
case <-p.lifeCycle.StopCh():
p.lifeCycle.StopComplete()
return
default:
p.process()
}
}
}
func (p *processor) process() {
placements, err := p.getPlacements()
if err != nil {
log.WithError(err).Error("jobmgr failed to dequeue placements")
return
}
select {
case <-p.lifeCycle.StopCh():
// placement dequeued but not processed
log.WithField("placements", placements).
Warn("ignoring placement after dequeue due to lost leadership")
return
default:
break
}
if len(placements) == 0 {
// log a debug to make it not verbose
log.Debug("No placements")
return
}
ctx := context.Background()
// Getting and launching placements in different go routine
log.WithField("placements", placements).Debug("Start processing placements")
for _, placement := range placements {
go p.processPlacement(ctx, placement)
}
}
// prepareTasksForLaunch returns current task configuration and
// the runtime diff which needs to be patched onto existing runtime for
// each launchable task. It returns error only when the placement contains
// less than required selected ports.
func (p *processor) prepareTasksForLaunch(
ctx context.Context,
taskIDs []*mesos.TaskID,
hostname string,
agentID string,
selectedPorts []uint32,
) (
map[string]*lifecyclemgr.LaunchableTaskInfo,
[]*peloton.TaskID,
error,
) {
portsIndex := 0
taskInfos := make(map[string]*lifecyclemgr.LaunchableTaskInfo)
skippedTaskIDs := make([]*peloton.TaskID, 0)
for _, mtaskID := range taskIDs {
id, instanceID, err := util.ParseJobAndInstanceID(mtaskID.GetValue())
if err != nil {
log.WithField("mesos_task_id", mtaskID.GetValue()).
WithError(err).
Error("Failed to parse mesos task id")
continue
}
jobID := &peloton.JobID{Value: id}
ptaskID := &peloton.TaskID{
Value: util.CreatePelotonTaskID(id, instanceID),
}
ptaskIDStr := ptaskID.GetValue()
cachedJob := p.jobFactory.GetJob(jobID)
if cachedJob == nil {
skippedTaskIDs = append(skippedTaskIDs, ptaskID)
continue
}
cachedTask, err := cachedJob.AddTask(ctx, uint32(instanceID))
if err != nil {
log.WithError(err).
WithFields(log.Fields{
"job_id": jobID.GetValue(),
"instance_id": uint32(instanceID),
}).Error("cannot add and recover task from DB")
continue
}
cachedRuntime, err := cachedTask.GetRuntime(ctx)
if err != nil {
log.WithError(err).
WithFields(log.Fields{
"job_id": jobID.GetValue(),
"instance_id": uint32(instanceID),
}).Error("cannot fetch task runtime")
continue
}
if cachedRuntime.GetMesosTaskId().GetValue() != mtaskID.GetValue() {
log.WithFields(log.Fields{
"job_id": jobID.GetValue(),
"instance_id": uint32(instanceID),
"mesos_task_id": mtaskID.GetValue(),
}).Info("skipping launch of old run")
skippedTaskIDs = append(skippedTaskIDs, ptaskID)
continue
}
taskConfig, configAddOn, err := p.taskConfigV2Ops.GetTaskConfig(
ctx,
jobID,
uint32(instanceID),
cachedRuntime.GetConfigVersion())
if err != nil {
log.WithError(err).
WithField("task_id", ptaskID.GetValue()).
Error("not able to get task configuration")
continue
}
var spec *pbpod.PodSpec
if p.hmVersion.IsV1() {
// TODO: unify this call with p.taskConfigV2Ops.GetTaskConfig().
spec, err = p.taskConfigV2Ops.GetPodSpec(
ctx,
jobID,
uint32(instanceID),
cachedRuntime.GetConfigVersion())
if err != nil {
log.WithError(err).
WithField("task_id", ptaskIDStr).
Error("not able to get pod spec")
continue
}
}
runtimeDiff := make(jobmgrcommon.RuntimeDiff)
if cachedRuntime.GetGoalState() != task.TaskState_KILLED {
runtimeDiff[jobmgrcommon.HostField] = hostname
runtimeDiff[jobmgrcommon.AgentIDField] = &mesos.AgentID{
Value: &agentID,
}
runtimeDiff[jobmgrcommon.StateField] = task.TaskState_LAUNCHED
}
if selectedPorts != nil {
// Reset runtime ports to get new ports assignment if placement has ports.
ports := make(map[string]uint32)
// Assign selected dynamic port to task per port config.
for _, portConfig := range taskConfig.GetPorts() {
if portConfig.GetValue() != 0 {
// Skip static port.
continue
}
if portsIndex >= len(selectedPorts) {
// This should never happen.
log.WithFields(log.Fields{
"selected_ports": selectedPorts,
"task_id": ptaskIDStr,
}).Error("placement contains less selected ports than required.")
return nil, nil, errors.New("invalid placement")
}
ports[portConfig.GetName()] = selectedPorts[portsIndex]
portsIndex++
}
runtimeDiff[jobmgrcommon.PortsField] = ports
}
runtimeDiff[jobmgrcommon.MessageField] = "Add hostname and ports"
runtimeDiff[jobmgrcommon.ReasonField] = "REASON_UPDATE_OFFER"
if cachedRuntime.GetGoalState() == task.TaskState_KILLED {
if cachedRuntime.GetState() != task.TaskState_KILLED {
// Received placement for task which needs to be killed,
// retry killing the task.
p.goalStateDriver.EnqueueTask(
jobID,
uint32(instanceID),
time.Now(),
)
}
// Skip launching of killed tasks.
skippedTaskIDs = append(skippedTaskIDs, ptaskID)
log.WithField("task_id", ptaskIDStr).
Info("skipping launch of killed task")
continue
}
// Patch task runtime with the generated runtime diff.
retry := 0
for retry < maxRetryCount {
_, _, err := cachedJob.PatchTasks(
ctx,
map[uint32]jobmgrcommon.RuntimeDiff{
uint32(instanceID): runtimeDiff,
},
false,
)
if err == nil {
runtime, _ := cachedTask.GetRuntime(ctx)
taskInfos[ptaskIDStr] = &lifecyclemgr.LaunchableTaskInfo{
TaskInfo: &task.TaskInfo{
Runtime: runtime,
Config: taskConfig,
InstanceId: uint32(instanceID),
JobId: jobID,
},
ConfigAddOn: configAddOn,
Spec: spec,
}
break
}
if common.IsTransientError(err) {
// TBD add a max retry to bail out after a few retries.
log.WithError(err).WithFields(log.Fields{
"job_id": jobID,
"instance_id": instanceID,
}).Warn("retrying update task runtime on transient error")
retry++
continue
}
log.WithError(err).
WithFields(log.Fields{
"job_id": jobID,
"instance_id": instanceID,
}).Error("cannot process placement due to non-transient db error")
delete(taskInfos, ptaskIDStr)
break
}
}
return taskInfos, skippedTaskIDs, nil
}
func (p *processor) processPlacement(
ctx context.Context,
placement *resmgr.Placement,
) {
var taskIDs []*mesos.TaskID
for _, t := range placement.GetTaskIDs() {
taskIDs = append(taskIDs, t.GetMesosTaskID())
}
launchableTaskInfos, skippedTaskIDs, err := p.prepareTasksForLaunch(
ctx,
taskIDs,
placement.GetHostname(),
placement.GetAgentId().GetValue(),
placement.GetPorts(),
)
if err != nil {
if newErr := p.lm.TerminateLease(
ctx,
placement.GetHostname(),
placement.GetAgentId().GetValue(),
placement.GetHostOfferID().GetValue(),
); newErr != nil {
err = errors.Wrap(err, newErr.Error())
}
// We do not return error, so it should be logged here.
log.WithError(err).
WithFields(log.Fields{
"placement": placement,
"tasks_total": len(taskIDs),
}).Error("failed to get launchable tasks")
p.metrics.TaskRequeuedOnLaunchFail.Inc(int64(len(taskIDs)))
return
}
// Populate secrets in place in task config of launchableTaskInfos.
skippedTaskInfos := p.populateSecrets(ctx, launchableTaskInfos)
if len(skippedTaskInfos) != 0 {
// Process the task launches that were skipped due to transient DB
// errors when fetching secrets. Continue with the rest of the launches.
p.processSkippedLaunches(ctx, skippedTaskInfos)
}
err = p.lm.Launch(
ctx,
placement.GetHostOfferID().GetValue(),
placement.GetHostname(),
placement.GetAgentId().GetValue(),
launchableTaskInfos,
nil,
)
if err != nil {
// We do not return error, so it should be logged here.
log.WithError(err).
WithFields(log.Fields{
"placement": placement,
"tasks_total": len(launchableTaskInfos),
}).Error("launch error, process skipped launches")
p.processSkippedLaunches(ctx, launchableTaskInfos)
return
}
p.enqueueTaskToGoalState(launchableTaskInfos)
// Kill skipped/unknown tasks. We ignore errors because that would indicate
// a channel/network error. We will retry when we can reconnect to
// resource-manager.
p.KillResManagerTasks(ctx, skippedTaskIDs)
}
// populateSecrets populates the eligible tasks with secret data.
// For the tasks which have transient errors when fetching the
// secret data from DB, it returns them as skipped.
func (p *processor) populateSecrets(
ctx context.Context,
taskInfos map[string]*lifecyclemgr.LaunchableTaskInfo,
) map[string]*lifecyclemgr.LaunchableTaskInfo {
skippedTaskInfos := make(map[string]*lifecyclemgr.LaunchableTaskInfo)
for id, taskInfo := range taskInfos {
// If task config has secret volumes, populate secret data in config.
err := p.populateTaskConfigWithSecrets(ctx, taskInfo.Config)
if err != nil {
if yarpcerrors.IsNotFound(errors.Cause(err)) {
// This is not retryable and we will never recover
// from this error. Mark the task runtime as KILLED
// before dropping it so that we don't try to launch it
// again. No need to enqueue to goalstate engine here.
// The caller does that for all tasks in TaskInfo.
// TODO: Notify resmgr that the state of this task
// is failed and it should not retry this task
// Need a private resmgr API for this.
if err = p.updateTaskRuntime(
ctx,
id,
task.TaskState_KILLED,
"REASON_SECRET_NOT_FOUND",
err.Error(),
); err != nil {
// Not retrying here, worst case we will attempt to launch
// this task again from ProcessPlacement() call, and mark
// goalstate properly in the next iteration.
log.WithError(err).WithField("task_id", id).
Error("failed to update goalstate to KILLED")
}
} else {
// Skip this task in case of transient error but add it to
// skippedTaskInfos so that the caller can ask resmgr to
// launch this task again.
log.WithError(err).
WithField("task_id", id).
Error("populateSecrets failed. skipping task")
skippedTaskInfos[id] = taskInfo
}
}
}
return skippedTaskInfos
}
// populateTaskConfigWithSecrets checks task config for secret volumes.
// If the config has volumes of type secret, it means that the Value field
// of that secret contains the secret ID. This function queries
// the DB to fetch the secret by secret ID and then replaces
// the secret Value by the fetched secret data.
// We do this to prevent secrets from being leaked as a part
// of job or task config and populate the task config with
// actual secrets just before task launch.
func (p *processor) populateTaskConfigWithSecrets(
ctx context.Context,
taskConfig *task.TaskConfig,
) error {
if taskConfig.GetContainer().GetType() != mesos.ContainerInfo_MESOS {
return nil
}
for _, volume := range taskConfig.GetContainer().GetVolumes() {
if volume.GetSource().GetType() == mesos.Volume_Source_SECRET &&
volume.GetSource().GetSecret().GetValue().GetData() != nil {
// Replace secret ID with actual secret here.
// This is done to make sure secrets are read from the DB
// when it is absolutely necessary and that they are not
// persisted in any place other than the secret_info table
// (for example as part of job/task config).
ctx, cancel := context.WithTimeout(
context.Background(), _defaultSecretInfoOpsTimeout)
defer cancel()
secretID := string(
volume.GetSource().GetSecret().GetValue().GetData())
secretInfoObj, err := p.secretInfoOps.GetSecret(
ctx,
secretID,
)
if err != nil {
p.metrics.TaskPopulateSecretFail.Inc(1)
return err
}
secretStr, err := base64.StdEncoding.DecodeString(
secretInfoObj.Data,
)
if err != nil {
p.metrics.TaskPopulateSecretFail.Inc(1)
return err
}
volume.GetSource().GetSecret().GetValue().Data =
[]byte(secretStr)
}
}
return nil
}
// updateTaskRuntime updates task runtime with goalstate, reason and message
// for the given task id.
func (p *processor) updateTaskRuntime(
ctx context.Context,
taskID string,
goalstate task.TaskState,
reason string,
message string,
) error {
runtimeDiff := jobmgrcommon.RuntimeDiff{
jobmgrcommon.GoalStateField: goalstate,
jobmgrcommon.ReasonField: reason,
jobmgrcommon.MessageField: message,
}
jobID, instanceID, err := util.ParseTaskID(taskID)
if err != nil {
return err
}
cachedJob := p.jobFactory.GetJob(&peloton.JobID{Value: jobID})
if cachedJob == nil {
return yarpcerrors.InternalErrorf("jobID %v not found in cache", jobID)
}
// Update the task in DB and cache, and then schedule to goalstate.
_, _, err = cachedJob.PatchTasks(
ctx,
map[uint32]jobmgrcommon.RuntimeDiff{uint32(instanceID): runtimeDiff},
false,
)
if err != nil {
return err
}
return nil
}
// processSkippedLaunches tries to kill the tasks in resmgr and
// if the kill goes through enqueue the task into resmgr.
func (p *processor) processSkippedLaunches(
ctx context.Context,
taskInfoMap map[string]*lifecyclemgr.LaunchableTaskInfo,
) {
var skippedTaskIDs []*peloton.TaskID
for id := range taskInfoMap {
skippedTaskIDs = append(skippedTaskIDs, &peloton.TaskID{Value: id})
}
// Kill and enqueue skipped tasks back to resmgr to launch again, instead of
// waiting for resmgr timeout.
if err := p.KillResManagerTasks(ctx, skippedTaskIDs); err == nil {
p.enqueueTasksToResMgr(ctx, taskInfoMap)
}
}
func (p *processor) enqueueTaskToGoalState(
taskInfos map[string]*lifecyclemgr.LaunchableTaskInfo,
) {
for id := range taskInfos {
jobID, instanceID, err := util.ParseTaskID(id)
if err != nil {
log.WithError(err).
WithField("task_id", id).
Error("failed to parse the task id in placement processor")
continue
}
p.goalStateDriver.EnqueueTask(
&peloton.JobID{Value: jobID},
uint32(instanceID),
time.Now())
cachedJob := p.jobFactory.AddJob(&peloton.JobID{Value: jobID})
goalstate.EnqueueJobWithDefaultDelay(
&peloton.JobID{Value: jobID},
p.goalStateDriver,
cachedJob)
}
}
func (p *processor) getPlacements() ([]*resmgr.Placement, error) {
ctx, cancelFunc := context.WithTimeout(context.Background(), _rpcTimeout)
defer cancelFunc()
request := &resmgrsvc.GetPlacementsRequest{
Limit: uint32(p.config.PlacementDequeueLimit),
Timeout: uint32(p.config.GetPlacementsTimeout),
}
callStart := time.Now()
response, err := p.resMgrClient.GetPlacements(ctx, request)
callDuration := time.Since(callStart)
if err != nil {
p.metrics.GetPlacementFail.Inc(1)
return nil, err
}
if response.GetError() != nil {
p.metrics.GetPlacementFail.Inc(1)
return nil, errors.New(response.GetError().String())
}
if len(response.GetPlacements()) != 0 {
log.WithFields(log.Fields{
"num_placements": len(response.Placements),
"duration": callDuration.Seconds(),
}).Info("GetPlacements")
}
// TODO: turn getplacement metric into gauge so we can
// get the current get_placements counts
p.metrics.GetPlacement.Inc(int64(len(response.GetPlacements())))
p.metrics.GetPlacementsCallDuration.Record(callDuration)
return response.GetPlacements(), nil
}
// enqueueTask enqueues given task to resmgr to launch again.
func (p *processor) enqueueTasksToResMgr(
ctx context.Context,
tasks map[string]*lifecyclemgr.LaunchableTaskInfo,
) (err error) {
defer func() {
if err == nil {
return
}
var taskIDs []string
for taskID := range tasks {
taskIDs = append(taskIDs, taskID)
}
log.WithError(err).WithFields(log.Fields{
"task_ids": taskIDs,
"tasks_total": len(tasks),
}).Error("failed to enqueue tasks to resmgr")
}()
if len(tasks) == 0 {
return nil
}
for _, t := range tasks {
healthState := taskutil.GetInitialHealthState(t.GetConfig())
runtimeDiff := taskutil.RegenerateMesosTaskIDDiff(
t.JobId, t.InstanceId, t.GetRuntime(), healthState)
runtimeDiff[jobmgrcommon.MessageField] = "Regenerate placement"
runtimeDiff[jobmgrcommon.ReasonField] = "REASON_HOST_REJECT_OFFER"
retry := 0
for retry < maxRetryCount {
cachedJob := p.jobFactory.AddJob(t.JobId)
_, _, err = cachedJob.PatchTasks(
ctx,
map[uint32]jobmgrcommon.RuntimeDiff{uint32(t.InstanceId): runtimeDiff},
false,
)
if err == nil {
p.goalStateDriver.EnqueueTask(t.JobId, t.InstanceId, time.Now())
goalstate.EnqueueJobWithDefaultDelay(
t.JobId, p.goalStateDriver, cachedJob)
break
}
if common.IsTransientError(err) {
log.WithError(err).WithFields(log.Fields{
"job_id": t.JobId,
"instance_id": t.InstanceId,
}).Warn("retrying update task runtime on transient error")
} else {
return err
}
retry++
}
}
return err
}
// Stop stops placement processor
func (p *processor) Stop() error {
if !(p.lifeCycle.Stop()) {
log.Warn("placement processor is already stopped, no action will be performed")
return nil
}
p.lifeCycle.Wait()
log.Info("placement processor stopped")
return nil
}
// KillResManagerTasks issues a kill request to resource-manager for
// specified tasks
func (p *processor) KillResManagerTasks(ctx context.Context,
tasks []*peloton.TaskID) error {
if len(tasks) == 0 {
return nil
}
req := &resmgrsvc.KillTasksRequest{Tasks: tasks}
ctx, cancelFunc := context.WithTimeout(ctx, _rpcTimeout)
defer cancelFunc()
resp, err := p.resMgrClient.KillTasks(ctx, req)
if err != nil {
log.WithError(err).WithField("task_list", tasks).
Error("placement: resource-manager KillTasks failed")
} else {
log.WithField("task_list", tasks).
Info("placement: killed resource-manager tasks")
if len(resp.Error) > 0 {
// Not really much we can do about these errors, just log them
log.WithField("errors", resp.Error).
Error("placement: resource-manager KillTasks errors")
}
}
return err
}