pkg/hostmgr/factory/task/task_builder.go (480 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"math"
"strconv"
"time"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/hostmgr/scalar"
hostmgrutil "github.com/uber/peloton/pkg/hostmgr/util"
mesos "github.com/uber/peloton/.gen/mesos/v1"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc"
)
const (
// PelotonJobIDLabelKey is the task label key for job ID
PelotonJobIDLabelKey = "peloton.job_id"
// PelotonInstanceIDLabelKey is the task label key for task instance ID
PelotonInstanceIDLabelKey = "peloton.instance_id"
// PelotonTaskIDLabelKey is the task label key for task ID
PelotonTaskIDLabelKey = "peloton.task_id"
// Set default task kill grace period to 30 seconds
_defaultTaskKillGracePeriod = 30 * time.Second
// Default custom executor id prefix
_defaultCustomExecutorPrefix = "thermos-"
// Default custom executor name
_defaultCustomExecutorName = "AuroraExecutor"
)
var (
_pelotonRole = "peloton"
_pelotonPrinciple = "peloton"
// ErrPortMismatch represents port not matching.
ErrPortMismatch = errors.New("port in launch not in offer")
// ErrNotEnoughResource means resource is not enough to match given task.
ErrNotEnoughResource = errors.New("not enough resources left to run task")
// ErrNotEnoughRevocableResource means revocable resources is not enough to match given task
ErrNotEnoughRevocableResource = errors.New("not enough revocable resources left to run task")
)
// Builder helps to build launchable Mesos TaskInfo from offers and
// peloton task configs.
type Builder struct {
// A map from role -> scalar resources.
scalars map[string]scalar.Resources
revocable scalar.Resources
// A map from available port number to role name.
portToRoles map[uint32]string
}
// NewBuilder creates a new instance of Builder, which caller can use to
// build Mesos tasks from the cached resources.
func NewBuilder(
resources []*mesos.Resource) *Builder {
scalars := make(map[string]scalar.Resources)
portToRoles := make(map[uint32]string)
revocable, _ := scalar.FilterMesosResources(
resources, func(r *mesos.Resource) bool {
return r.GetRevocable() != nil
})
for _, rs := range resources {
if rs.GetRevocable() != nil {
continue
}
tmp := scalar.FromMesosResource(rs)
role := rs.GetRole()
if !tmp.Empty() {
prev := scalars[role]
scalars[role] = prev.Add(tmp)
} else {
ports := util.ExtractPortSet(rs)
if len(ports) == 0 {
continue
}
for p := range ports {
portToRoles[p] = role
}
}
}
// TODO: Look into whether we need to prefer reserved (non-* role)
// or unreserved (* role).
return &Builder{
scalars: scalars,
revocable: scalar.FromMesosResources(revocable),
portToRoles: portToRoles,
}
}
// Helper struct for port picking result.
type portPickResult struct {
// select ports numbers by name.
// This is used to populated DiscoveryInfo.Ports
// so service discovery systems can find the task.
// This includes both static and dynamic ports.
selectedPorts map[string]uint32
// Environment variables. This includes both static and dynamic ports.
portEnvs map[string]string
// Mesos resources used. This will be passed within `TaskInfo` to Mesos
// to launch the task.
portResources []*mesos.Resource
}
// pickPorts inspects the given taskConfig, picks dynamic ports from
// available resources, and return selected static and dynamic ports
// as well as necessary environment variables and resources.
func (tb *Builder) pickPorts(
taskConfig *task.TaskConfig,
selectedDynamicPorts map[string]uint32) (*portPickResult, error) {
result := &portPickResult{
selectedPorts: make(map[string]uint32),
portEnvs: make(map[string]string),
portResources: []*mesos.Resource{},
}
if len(taskConfig.Ports) == 0 {
return result, nil
}
// Populates dynamic ports and build Mesos resources objects,
// which will be used later to launch the task.
dynamicPorts := make(map[uint32]string)
for name, port := range selectedDynamicPorts {
role, ok := tb.portToRoles[port]
if !ok {
return nil, ErrPortMismatch
}
delete(tb.portToRoles, port)
dynamicPorts[port] = role
result.selectedPorts[name] = port
}
result.portResources = append(
result.portResources,
util.CreatePortResources(dynamicPorts)...)
// Populate static ports and extra environment variables, which will be
// added to `CommandInfo` to launch the task.
for _, portConfig := range taskConfig.GetPorts() {
name := portConfig.GetName()
if len(name) == 0 {
return nil, errors.New("Empty port name in task")
}
value := portConfig.GetValue()
if value != 0 { // static port
result.selectedPorts[name] = value
}
if envName := portConfig.GetEnvName(); len(envName) == 0 {
continue
} else {
p := int(result.selectedPorts[name])
result.portEnvs[envName] = strconv.Itoa(p)
}
}
return result, nil
}
// Build is used to build a `mesos.TaskInfo` from cached resources.
// Caller can keep calling this function to build more tasks.
// This returns error if the current
// instance does not have enough resources leftover (scalar, port in future),
// or the taskConfig is not correct.
func (tb *Builder) Build(
task *hostsvc.LaunchableTask,
) (*mesos.TaskInfo, error) {
// Validation of input.
taskConfig := task.GetConfig()
if taskConfig == nil {
return nil, errors.New("TaskConfig cannot be nil")
}
taskID := task.GetTaskId()
if taskID == nil {
return nil, errors.New("taskID cannot be nil")
}
taskResources := taskConfig.Resource
if taskResources == nil {
return nil, errors.New("TaskConfig.Resource cannot be nil")
}
jobID, instanceID, err := util.ParseJobAndInstanceID(taskID.GetValue())
if err != nil {
return nil, err
}
if taskConfig.GetCommand() == nil {
return nil, errors.New("Command cannot be nil")
}
// lres is list of "launch" resources this task needs when launched.
lres, err := tb.extractScalarResources(
taskConfig.GetResource(),
taskConfig.GetRevocable())
if err != nil {
return nil, err
}
selectedDynamicPorts := task.GetPorts()
pick, err := tb.pickPorts(taskConfig, selectedDynamicPorts)
if err != nil {
return nil, err
}
if len(pick.portResources) > 0 {
lres = append(lres, pick.portResources...)
}
mesosTask := &mesos.TaskInfo{
Name: &jobID,
TaskId: taskID,
Resources: lres,
}
tb.populateExecutorInfo(
mesosTask,
taskConfig.GetExecutor(),
taskID,
)
tb.populateKillPolicy(mesosTask, taskConfig.GetKillGracePeriodSeconds())
tb.populateDiscoveryInfo(mesosTask, pick.selectedPorts, jobID)
tb.populateCommandInfo(
mesosTask,
taskConfig.GetCommand(),
pick.portEnvs,
jobID,
instanceID,
)
tb.populateContainerInfo(mesosTask, taskConfig.GetContainer())
tb.populateLabels(mesosTask, taskConfig.GetLabels(), jobID, instanceID)
tb.populateHealthCheck(mesosTask, taskConfig.GetHealthCheck())
return mesosTask, nil
}
// populateExecutorInfo sets up the ExecutorInfo of a Mesos task and copys
// executor data to Mesos task if present.
func (tb *Builder) populateExecutorInfo(
mesosTask *mesos.TaskInfo,
executor *mesos.ExecutorInfo,
taskID *mesos.TaskID,
) {
if executor == nil || executor.GetType() != mesos.ExecutorInfo_CUSTOM {
// Only allow custom executor - aurora thermos - use case for now.
// Maybe consider adding support for Mesos "default" executor in
// the future??
return
}
// Make a deep copy of pass through fields to avoid changing input.
executorInfo := proto.Clone(executor).(*mesos.ExecutorInfo)
executorIDValue := _defaultCustomExecutorPrefix + taskID.GetValue()
executorInfo.ExecutorId = &mesos.ExecutorID{
Value: &executorIDValue,
}
executorName := _defaultCustomExecutorName
if executorInfo.Name == nil {
executorInfo.Name = &executorName
}
// Not populating executor overhead resource, since it is assumed to be
// included in the config provided by federation.
executorInfo.Resources = []*mesos.Resource{}
mesosTask.Executor = executorInfo
// Make a copy of the executor data since aurora thermos executor
// reads data from TaskInfo.
if executorInfo.Data != nil {
mesosTask.Data = make([]byte, len(executorInfo.Data))
copy(mesosTask.Data, executorInfo.Data)
}
}
// populateCommandInfo properly sets up the CommandInfo of a Mesos task
// and populates any optional environment variables in envMap. It populates
// ExecutorInfo field instead when custom executor is requested.
func (tb *Builder) populateCommandInfo(
mesosTask *mesos.TaskInfo,
command *mesos.CommandInfo,
envMap map[string]string,
jobID string,
instanceID uint32,
) {
if command == nil {
// Input validation happens before this, so this is a case that
// we want a nil CommandInfo (aka custom executor case).
return
}
// Make a deep copy of pass through fields to avoid changing input.
commandInfo := proto.Clone(command).(*mesos.CommandInfo)
// Populate optional environment variables.
// Make sure `Environment` field is initialized.
if commandInfo.GetEnvironment().GetVariables() == nil {
commandInfo.Environment = &mesos.Environment{
Variables: []*mesos.Environment_Variable{},
}
}
pelotonEnvs := []*mesos.Environment_Variable{
{
Name: util.PtrPrintf(hostmgrutil.LabelKeyToEnvVarName(PelotonJobIDLabelKey)),
Value: &jobID,
},
{
Name: util.PtrPrintf(hostmgrutil.LabelKeyToEnvVarName(PelotonInstanceIDLabelKey)),
Value: util.PtrPrintf("%d", instanceID),
},
{
Name: util.PtrPrintf(hostmgrutil.LabelKeyToEnvVarName(PelotonTaskIDLabelKey)),
Value: util.PtrPrintf("%s-%d", jobID, instanceID),
},
}
// Add peloton specific environtment variables.
commandInfo.Environment.Variables = append(commandInfo.Environment.Variables, pelotonEnvs...)
for name, value := range envMap {
// Make a copy since taking address of key/value in map
// is dangerous.
tmpName := name
tmpValue := value
env := &mesos.Environment_Variable{
Name: &tmpName,
Value: &tmpValue,
}
commandInfo.Environment.Variables =
append(commandInfo.Environment.Variables, env)
}
if mesosTask.Executor != nil {
// We are using custom executor
mesosTask.Executor.Command = commandInfo
} else {
mesosTask.Command = commandInfo
}
}
// populateContainerInfo properly sets up the `ContainerInfo` field of a task.
// It populates ContainerInfo if custom executor is requested.
func (tb *Builder) populateContainerInfo(
mesosTask *mesos.TaskInfo,
container *mesos.ContainerInfo,
) {
if container == nil {
return
}
// Make a deep copy to avoid changing input.
containerInfo := proto.Clone(container).(*mesos.ContainerInfo)
if mesosTask.Executor != nil {
// We are using custom executor
mesosTask.Executor.Container = containerInfo
} else {
mesosTask.Container = containerInfo
}
}
// populateLabels properly sets up the `Labels` field of a mesos task.
func (tb *Builder) populateLabels(
mesosTask *mesos.TaskInfo,
labels []*peloton.Label,
jobID string,
instanceID uint32,
) {
var mesosLabels *mesos.Labels
// Translate all peloton task labels into mesos task labels
if len(labels) == 0 {
mesosLabels = &mesos.Labels{
Labels: make([]*mesos.Label, 0, 3),
}
} else {
mesosLabels = util.ConvertLabels(labels)
}
// add mesos task default labels
mesosLabels.Labels = append(mesosLabels.Labels, &mesos.Label{
Key: util.PtrPrintf(PelotonJobIDLabelKey),
Value: &jobID,
})
mesosLabels.Labels = append(mesosLabels.Labels, &mesos.Label{
Key: util.PtrPrintf(PelotonInstanceIDLabelKey),
Value: util.PtrPrintf("%d", instanceID),
})
mesosLabels.Labels = append(mesosLabels.Labels, &mesos.Label{
Key: util.PtrPrintf(PelotonTaskIDLabelKey),
Value: util.PtrPrintf("%s-%d", jobID, instanceID),
})
mesosTask.Labels = &mesos.Labels{
Labels: mesosLabels.Labels,
}
}
// populateKillPolicy populates the `KillPolicy` field of the task with the
// default or custom task kill grace period.
func (tb *Builder) populateKillPolicy(mesosTask *mesos.TaskInfo,
taskKillGracePeriodSecs uint32) {
// set grace period to default value
gracePeriodNsec := int64(_defaultTaskKillGracePeriod.Nanoseconds())
if taskKillGracePeriodSecs > 0 {
gracePeriod := time.Duration(taskKillGracePeriodSecs) * time.Second
gracePeriodNsec = int64(gracePeriod.Nanoseconds())
}
mesosTask.KillPolicy = &mesos.KillPolicy{
GracePeriod: &mesos.DurationInfo{
Nanoseconds: &gracePeriodNsec,
},
}
}
// populateDiscoveryInfo populates the `DiscoveryInfo` field of the task
// so service discovery integration can find which ports the task is using.
func (tb *Builder) populateDiscoveryInfo(
mesosTask *mesos.TaskInfo,
selectedPorts map[string]uint32,
jobID string,
) {
if len(selectedPorts) == 0 {
return
}
// Visibility field on DiscoveryInfo is required.
defaultVisibility := mesos.DiscoveryInfo_EXTERNAL
portSlice := []*mesos.Port{}
for name, value := range selectedPorts {
// NOTE: we need tmp for both name and value,
// as taking address during iterator is unsafe.
tmpName := name
tmpValue := value
portSlice = append(portSlice, &mesos.Port{
Name: &tmpName,
Number: &tmpValue,
Visibility: &defaultVisibility,
// TODO: Consider add protocol, visibility and labels.
})
}
// Note that this overrides any DiscoveryInfo even if it's in input.
mesosTask.Discovery = &mesos.DiscoveryInfo{
Name: &jobID,
Visibility: &defaultVisibility,
Ports: &mesos.Ports{
Ports: portSlice,
},
// TODO:
// 1. add Environment, Location, Version, and Labels;
// 2. Determine how to find this in bridge.
}
}
// populateHealthCheck properly sets up the health check part of a Mesos task.
func (tb *Builder) populateHealthCheck(
mesosTask *mesos.TaskInfo, health *task.HealthCheckConfig) {
if health == nil {
return
}
mh := &mesos.HealthCheck{}
if t := health.GetInitialIntervalSecs(); t > 0 {
tmp := float64(t)
mh.DelaySeconds = &tmp
}
if t := health.GetIntervalSecs(); t > 0 {
tmp := float64(t)
mh.IntervalSeconds = &tmp
}
if t := health.GetTimeoutSecs(); t > 0 {
tmp := float64(t)
mh.TimeoutSeconds = &tmp
}
if t := health.GetMaxConsecutiveFailures(); t > 0 {
tmp := uint32(t)
mh.ConsecutiveFailures = &tmp
}
switch health.GetType() {
case task.HealthCheckConfig_COMMAND:
cc := health.GetCommandCheck()
t := mesos.HealthCheck_COMMAND
mh.Type = &t
shell := true
value := cc.GetCommand()
cmd := &mesos.CommandInfo{
Shell: &shell,
Value: &value,
}
if !cc.GetUnshareEnvironments() {
cmd.Environment = proto.Clone(
mesosTask.GetCommand().GetEnvironment(),
).(*mesos.Environment)
}
mh.Command = cmd
case task.HealthCheckConfig_HTTP:
cc := health.GetHttpCheck()
t := mesos.HealthCheck_HTTP
mh.Type = &t
scheme := cc.GetScheme()
port := cc.GetPort()
path := cc.GetPath()
h := &mesos.HealthCheck_HTTPCheckInfo{
Scheme: &scheme,
Port: &port,
Path: &path,
}
mh.Http = h
default:
log.WithField("type", health.GetType()).
Warn("Unknown health check type")
return
}
log.WithFields(log.Fields{
"health": mh,
"task": mesosTask.GetTaskId(),
}).Debug("Populated health check for mesos task")
mesosTask.HealthCheck = mh
}
// extractScalarResources takes necessary scalar resources from cached resources
// of this instance to construct a task, and returns error if not enough
// resources are left.
func (tb *Builder) extractScalarResources(
taskResources *task.ResourceConfig,
revocable bool) ([]*mesos.Resource, error) {
var launchResources []*mesos.Resource
requiredScalar := scalar.FromResourceConfig(taskResources)
if revocable {
// ToDo: Implement a more clean design on comparing revocable resources.
rs, err := tb.extractRevocableScalarResources(&requiredScalar)
if err != nil {
log.WithFields(log.Fields{
"resources_requested": requiredScalar,
"total_revocable_resources": tb.revocable,
}).Error("not enough revocable resources to match current task")
return nil, err
}
launchResources = append(launchResources, rs...)
}
for role, leftover := range tb.scalars {
minimum := scalar.Minimum(leftover, requiredScalar)
if minimum.Empty() {
continue
}
rs := util.CreateMesosScalarResources(map[string]float64{
common.MesosCPU: minimum.CPU,
common.MesosMem: minimum.Mem,
common.MesosDisk: minimum.Disk,
common.MesosGPU: minimum.GPU,
}, role)
launchResources = append(launchResources, rs...)
trySubtract, ok := leftover.TrySubtract(minimum)
if !ok {
msg := "Incorrect resource amount in leftover!"
log.WithFields(log.Fields{
"leftover": leftover,
"minimum": minimum,
}).Warn(msg)
return nil, errors.New(msg)
}
leftover = trySubtract
if leftover.Empty() {
delete(tb.scalars, role)
} else {
tb.scalars[role] = leftover
}
trySubtract, ok = requiredScalar.TrySubtract(minimum)
if !ok {
msg := "Incorrect resource amount in required!"
log.WithFields(log.Fields{
"required": requiredScalar,
"minimum": minimum,
}).Warn(msg)
return nil, errors.New(msg)
}
requiredScalar = trySubtract
if requiredScalar.Empty() {
break
}
}
if !requiredScalar.Empty() {
log.WithFields(log.Fields{
"task_resource_requested": scalar.FromResourceConfig(taskResources).String(),
"task_resources_left_to_match": requiredScalar.String(),
"total_revocable_resources": tb.revocable.String(),
"total_non_revocable_resources": tb.scalars,
}).Error("task resources are still left to match")
return nil, ErrNotEnoughResource
}
return launchResources, nil
}
// extractRevocableResources extract revocable resources for required task resources.
func (tb *Builder) extractRevocableScalarResources(
requiredScalar *scalar.Resources) ([]*mesos.Resource, error) {
leftover := math.Round((tb.revocable.CPU-requiredScalar.CPU)*100) / 100
if leftover >= 0 {
tb.revocable.CPU = leftover
} else {
return nil, ErrNotEnoughRevocableResource
}
rs := util.CreateMesosScalarResources(
map[string]float64{common.MesosCPU: requiredScalar.CPU},
"*")
requiredScalar.CPU = 0
for _, resource := range rs {
resource.Revocable = &mesos.Resource_RevocableInfo{}
}
return rs, nil
}