pkg/placement/models/v0/assignment.go (123 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package models_v0 import ( "time" log "github.com/sirupsen/logrus" "github.com/uber/peloton/.gen/peloton/api/v0/job" "github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc" "github.com/uber/peloton/.gen/peloton/private/resmgr" "github.com/uber/peloton/pkg/hostmgr/scalar" "github.com/uber/peloton/pkg/placement/models" "github.com/uber/peloton/pkg/placement/plugins" ) const ( _defaultMaxHosts = 1 ) // Ensure that Assignment implements the Task interface. var _ models.Task = &Assignment{} // Assignment represents the assignment of a task to a host. // One host can be used in multiple assignments. type Assignment struct { Task *TaskV0 `json:"task"` Offer models.Offer `json:"host"` PlacementFailure string } // NewAssignment will create a new empty assignment from a task. func NewAssignment(task *TaskV0) *Assignment { return &Assignment{ Task: task, } } // IncRounds increments the internal count of placement rounds. func (a *Assignment) IncRounds() { a.Task.IncRounds() } // IsPastMaxRounds returns true if the task has been through too many // placement rounds. func (a *Assignment) IsPastMaxRounds() bool { return a.Task.PastMaxRounds() } // IsPastDeadline returns true if the task is past its placement deadline. // Additionally, if the task has a preferred host and is past its host placement // deadline, it will also return true. func (a *Assignment) IsPastDeadline(now time.Time) bool { if a.PreferredHost() == "" { return a.Task.PastDeadline(now) } return a.Task.PastDesiredHostPlacementDeadline(now) } // SetPlacement sets the matching offer of this task. func (a *Assignment) SetPlacement(offer models.Offer) { a.Offer = offer } // GetPlacement returns the matching offer of this task. func (a *Assignment) GetPlacement() models.Offer { return a.Offer } // OrchestrationID returns the mesos task ID or pod name. func (a *Assignment) OrchestrationID() string { return a.Task.GetTask().GetTaskId().GetValue() } // GetPlacementFailure returns the reason why the assignment was unsuccessful func (a *Assignment) GetPlacementFailure() string { return a.PlacementFailure } // SetPlacementFailure sets the reason for the failed assignment func (a *Assignment) SetPlacementFailure(failureReason string) { a.PlacementFailure = failureReason } // Fits returns true if the given resources fit in the assignment. func (a *Assignment) Fits( resLeft scalar.Resources, portsLeft uint64, ) (scalar.Resources, uint64, bool) { resNeeded, portsUsed := a.getUsage() if portsLeft < portsUsed { log.WithFields(log.Fields{ "resmgr_task": a.GetTask().GetTask(), "num_available_ports": portsLeft, }).Debug("Insufficient ports resources.") return resLeft, portsLeft, false } remains, ok := resLeft.TrySubtract(resNeeded) if !ok { log.WithFields(log.Fields{ "remain": resLeft, "usage": resNeeded, }).Debug("Insufficient resources remain") return resLeft, portsLeft, false } return remains, portsLeft - portsUsed, true } // PelotonID returns the peloton id of the task that the assignment represents. func (a *Assignment) PelotonID() string { return a.Task.GetTask().GetId().GetValue() } // NeedsSpread returns whether this task was asked to be spread // onto the hosts in its gang. func (a *Assignment) NeedsSpread() bool { spreadStrat := job.PlacementStrategy_PLACEMENT_STRATEGY_SPREAD_JOB return a.GetTask().GetTask().GetPlacementStrategy() == spreadStrat } // PreferredHost returns the host preference for this task. func (a *Assignment) PreferredHost() string { return a.GetTask().GetTask().GetDesiredHost() } // GetPlacementNeeds returns the placement needs of this task. func (a *Assignment) GetPlacementNeeds() plugins.PlacementNeeds { rmTask := a.GetTask().GetTask() needs := plugins.PlacementNeeds{ Resources: scalar.FromResourceConfig(rmTask.GetResource()), Ports: uint64(rmTask.GetNumPorts()), Revocable: rmTask.Revocable, FDs: rmTask.GetResource().GetFdLimit(), MaxHosts: _defaultMaxHosts, HostHints: map[string]string{}, Constraint: rmTask.Constraint, } if a.PreferredHost() != "" { needs.HostHints[a.PelotonID()] = a.PreferredHost() } // To spread out tasks over hosts, request host-manager // to rank hosts randomly instead of a predictable order such // as most-loaded. if rmTask.GetPlacementStrategy() == job.PlacementStrategy_PLACEMENT_STRATEGY_SPREAD_JOB { needs.RankHint = hostsvc.FilterHint_FILTER_HINT_RANKING_RANDOM } return needs } // IsReadyForHostReservation returns true if this task is ready for host reservation. func (a *Assignment) IsReadyForHostReservation() bool { return a.GetTask().GetTask().ReadyForHostReservation } // IsRevocable returns true if the task should run on revocable resources. func (a *Assignment) IsRevocable() bool { return a.GetTask().GetTask().Revocable } // getUsage returns the resource and port usage of this assignment. func (a *Assignment) getUsage() (res scalar.Resources, ports uint64) { res = scalar.FromResourceConfig(a.Task.GetTask().GetResource()) ports = uint64(a.Task.GetTask().GetNumPorts()) return } // GetTask returns the task of the assignment. func (a *Assignment) GetTask() *TaskV0 { return a.Task } // SetTask sets the task in the assignment to the given task. func (a *Assignment) SetTask(task *TaskV0) { a.Task = task } // GetResmgrTaskV0 returns the resource manager task for this assignment. func (a *Assignment) GetResmgrTaskV0() *resmgr.Task { return a.Task.GetTask() }