pkg/hostmgr/summary/summary.go (660 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package summary
import (
"context"
"fmt"
"strings"
"sync"
"time"
mesos "github.com/uber/peloton/.gen/mesos/v1"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
halphapb "github.com/uber/peloton/.gen/peloton/api/v1alpha/host"
"github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc"
"github.com/uber/peloton/pkg/common/constraints"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/hostmgr/host"
"github.com/uber/peloton/pkg/hostmgr/scalar"
hmutil "github.com/uber/peloton/pkg/hostmgr/util"
"github.com/uber/peloton/pkg/hostmgr/watchevent"
"github.com/gogo/protobuf/proto"
"github.com/pborman/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/uber-go/atomic"
)
const (
unreservedRole = "*"
)
// Offer represents an offer sent from the host summary when the host is
// moved to PLACING state.
//
// Evey offer sent has a unique ID which is used to to
// claim the offer for launching a task.
// The ID is reset when the host moves to READY state.
//
// The ID is not persisted and is reset when a hostmanager restarts,
// remaining in flight tasks will have to placed again and generate new ID
// for the offers.
type Offer struct {
ID string
Offers []*mesos.Offer
}
// InvalidHostStatus is returned when expected status on a hostSummary
// does not match actual value.
type InvalidHostStatus struct {
status HostStatus
}
// Error implements error.Error.
func (e InvalidHostStatus) Error() string {
return fmt.Sprintf("Invalid status %v", e.status)
}
// HostStatus represents status (Ready/Placing/Reserved) of the host in offer pool's cache (host -> offers).
type HostStatus int
const (
// ReadyHost represents an host ready to be used.
ReadyHost HostStatus = iota + 1
// PlacingHost represents an host being used by placement engine.
PlacingHost
// ReservedHost represents an host is reserved for tasks
ReservedHost
// HeldHost represents a host is held for tasks, which is used for in-place update
HeldHost
)
// OfferType represents the type of offer in the host summary such as reserved, unreserved, or All.
type OfferType int
const (
// Reserved offer type, represents an offer reserved for a particular Mesos Role.
Reserved OfferType = iota + 1
// Unreserved offer type, is not reserved for any Mesos role, and can be used to launch task for
// any role if framework has opt-in MULTI_ROLE capability.
Unreserved
// All represents reserved and unreserved offers.
All
)
const (
// hostHeldHostStatusTimeout is a timeout for resetting
// HeldHost status back to ReadyHost status.
hostHeldStatusTimeout = 3 * time.Minute
// emptyOfferID is used when the host is in READY state.
emptyOfferID = ""
)
// HostSummary is the core component of host manager's internal
// data structure. It keeps track of offers in various state,
// launching cycles and reservation information for a host.
type HostSummary interface {
// HasOffer provides a quick heuristic about if HostSummary has any
// unreserved READY offer.
HasOffer() bool
// HasAnyOffer returns true if host has any offer, including both reserved
// and unreserved offer.
HasAnyOffer() bool
// GetTasks returns tasks placed or running on this host.
GetTasks() []*mesos.TaskID
// UpdateTasksOnHost updates the state of task placed or running on this host.
// If task state is terminal then remove from HostToTaskMap.
UpdateTasksOnHost(taskID string, taskState task.TaskState, taskInfo *task.TaskInfo)
// TryMatch atomically tries to match offers from the current host with
// given constraint.
TryMatch(
hostFilter *hostsvc.HostFilter,
evaluator constraints.Evaluator,
labelValues constraints.LabelValues) Match
// AddMesosOffer adds a Mesos offers to the current HostSummary.
AddMesosOffers(ctx context.Context, offer []*mesos.Offer) HostStatus
// RemoveMesosOffer removes the given Mesos offer by its id, and returns
// CacheStatus and possibly removed offer for tracking purpose.
RemoveMesosOffer(offerID, reason string) (HostStatus, *mesos.Offer)
// ClaimForLaunch releases unreserved offers for task launch.
// An optional list of task ids is provided if the host is held for
// the tasks
ClaimForLaunch(
hostOfferID string,
launchableTasks []*hostsvc.LaunchableTask,
taskIDs ...*peloton.TaskID) (map[string]*mesos.Offer, error)
// CasStatus atomically sets the status to new value if current value is old,
// otherwise returns error.
CasStatus(old, new HostStatus) error
// UnreservedAmount returns unreserved non-revocable and revocable resources
// and current host status
UnreservedAmount() (scalar.Resources, scalar.Resources, HostStatus)
// ResetExpiredPlacingOfferStatus resets a hostSummary status from PlacingOffer
// if the PlacingOffer status has expired, and returns
// whether the hostSummary got reset, resources amount for unreserved offers and
// the tasks held released due to the expiration
ResetExpiredPlacingOfferStatus(now time.Time) (bool, scalar.Resources, []*peloton.TaskID)
// ResetExpiredHostHeldStatus resets a hostSummary status from HeldHost
// if the HeldHost status has expired, and returns
// whether the hostSummary got reset, resources amount for unreserved offers and
// the tasks held released due to the expiration
ResetExpiredHostHeldStatus(now time.Time) (bool, scalar.Resources, []*peloton.TaskID)
// GetOffers returns offers and #offers present for this host, of type reserved, unreserved or all.
// Returns map of offerid -> offer
GetOffers(OfferType) map[string]*mesos.Offer
// GetHostname returns the hostname of the host
GetHostname() string
// GetHostStatus returns the HostStatus of the host
GetHostStatus() HostStatus
// GetHostOfferID returns the hostOfferID of the host
GetHostOfferID() string
// HoldForTasks holds the host for the task specified.
// If an error is returned, hostsummary would guarantee that
// the host is not on held for the task
HoldForTask(id *peloton.TaskID) error
// ReleaseHoldForTasks release the hold of host for the task specified.
// If an error is returned, hostsummary would guarantee that
// the host is not on released for the task
ReleaseHoldForTask(id *peloton.TaskID) error
// ReturnPlacingHost is called when the host in PLACING state is not used,
// and is returned by placement engine
ReturnPlacingHost() error
// GetHeldTask returns a slice of task that puts the host in held
GetHeldTask() []*peloton.TaskID
}
type offerIDgenerator func() string
// returns a new random (version 4) UUID as a string
func uuidOfferID() string {
return uuid.New()
}
// hostSummary is a data struct holding offers on a particular host.
type hostSummary struct {
sync.Mutex
// hostname of the host
hostname string
// scarceResourceTypes are resources, which are exclusively reserved for
// specific task requirements, and to prevent every task to schedule on
// those hosts such as GPU.
scarceResourceTypes []string
// Usage slack is resources that are allocated but not completely used.
// As of now, cpus is only resource type supported as slack.
slackResourceTypes []string
// mesos offerID -> unreserved offer
unreservedOffers map[string]*mesos.Offer
// mesos offerID -> reserved offer
reservedOffers map[string]*mesos.Offer
status HostStatus
statusPlacingOfferExpiration time.Time
hostPlacingOfferStatusTimeout time.Duration
// When the host has been matched for placing i.e.
// the host status is PLACING or RESERVED a unique host offer ID is
// generated.
// For all other host states the host offer ID is empty.
hostOfferID string
offerIDgenerator offerIDgenerator
readyCount atomic.Int32
// a map to present for which tasks the host is held,
// key is the task id, value is the expiration time
// of the hold
heldTasks map[string]time.Time
// a map to present tasks assigned or running on this host
// key is the mesos tasks id and value is the current task state
tasks map[string]*task.TaskInfo
// watchProcessor
watchProcessor watchevent.WatchProcessor
}
// New returns a zero initialized hostSummary
func New(
scarceResourceTypes []string,
hostname string,
slackResourceTypes []string,
hostPlacingOfferStatusTimeout time.Duration,
processor watchevent.WatchProcessor,
) HostSummary {
return &hostSummary{
unreservedOffers: make(map[string]*mesos.Offer),
reservedOffers: make(map[string]*mesos.Offer),
heldTasks: make(map[string]time.Time),
tasks: make(map[string]*task.TaskInfo),
scarceResourceTypes: scarceResourceTypes,
slackResourceTypes: slackResourceTypes,
hostPlacingOfferStatusTimeout: hostPlacingOfferStatusTimeout,
status: ReadyHost,
hostname: hostname,
offerIDgenerator: uuidOfferID,
watchProcessor: processor,
}
}
// HasOffer is a lock-free heuristic about if the HostSummary has any unreserved ReadyOffer status.
// TODO(zhitao): Create micro-benchmark to prove this is useful,
// otherwise remove it!
func (a *hostSummary) HasOffer() bool {
return a.readyCount.Load() > 0
}
// HasAnyOffer returns true if host has any offer, including both reserved
// and unreserved offer.
func (a *hostSummary) HasAnyOffer() bool {
a.Lock()
defer a.Unlock()
return len(a.unreservedOffers) > 0
}
// GetTasks returns tasks placed or running on this host.
func (a *hostSummary) GetTasks() []*mesos.TaskID {
a.Lock()
defer a.Unlock()
var result []*mesos.TaskID
for taskID := range a.tasks {
t := taskID
result = append(result, &mesos.TaskID{Value: &t})
}
return result
}
// UpdateTasksOnHost updates the list of tasks on this host.
// It can perform one of following actions
// - Add/Update the status of task based on recovery or eventstream.
// - Remove the task from the list if task state is terminal.
func (a *hostSummary) UpdateTasksOnHost(
taskID string,
taskState task.TaskState,
taskInfo *task.TaskInfo) {
a.Lock()
defer a.Unlock()
switch {
case util.IsPelotonStateTerminal(taskState):
delete(a.tasks, taskID)
case taskInfo != nil:
a.tasks[taskID] = taskInfo
default:
taskInfo, ok := a.tasks[taskID]
if !ok {
return
}
taskInfo.Runtime.State = taskState
}
}
// Match represents the result of a match
type Match struct {
// The result of the match
Result hostsvc.HostFilterResult
// Offer if the match is successful
Offer *Offer
}
// isHostLimitConstraintSatisfy validates task to task affinity constraint.
// It limits number of tasks for same service to run on same host.
func (a *hostSummary) isHostLimitConstraintSatisfy(
labelConstraint *task.LabelConstraint,
) bool {
// Aggregates label count for all tasks running on this host.
// label_key -> label_value -> count
labelsCount := make(map[string]map[string]uint32)
for _, task := range a.tasks {
labels := task.GetConfig().GetLabels()
for _, label := range labels {
key := label.GetKey()
value := label.GetValue()
if _, ok := labelsCount[key]; !ok {
labelsCount[key] = map[string]uint32{value: 1}
continue
}
labelsCount[key][value] = labelsCount[key][value] + 1
}
}
// Checks whether label constraint requirement is met or not.
// requirement = 1 && zero task running -> satisfies the requirement
// requirement = 1 && one task running -> does not satisfy requirement
// requirement = 2 && one task running -> satisfies the requirement
label := labelConstraint.GetLabel()
if labelCountValue, ok := labelsCount[label.GetKey()]; ok {
if labelCountValue[label.GetValue()] >= labelConstraint.GetRequirement() {
return false
}
}
return true
}
// matchHostFilter determines whether given HostFilter matches
// the given map of offers.
func matchHostFilter(
offerMap map[string]*mesos.Offer,
c *hostsvc.HostFilter,
evaluator constraints.Evaluator,
labelValues constraints.LabelValues,
scalarAgentRes scalar.Resources,
scarceResourceTypes []string) hostsvc.HostFilterResult {
if len(offerMap) == 0 {
return hostsvc.HostFilterResult_NO_OFFER
}
min := c.GetResourceConstraint().GetMinimum()
if min != nil {
scalarRes := scalar.FromOfferMap(offerMap)
if c.GetResourceConstraint().GetRevocable() {
revocable, _ := scalar.FilterRevocableMesosResources(
scalar.FromOffersMapToMesosResources(offerMap))
// ToDo: Implement clean design to not hard code cpus
scalarRevocable := scalar.FromMesosResources(revocable)
scalarRes.CPU = scalarRevocable.CPU
} else {
_, nonRevocable := scalar.FilterRevocableMesosResources(
scalar.FromOffersMapToMesosResources(offerMap))
scalarRes = scalar.FromMesosResources(nonRevocable)
}
scalarMin := scalar.FromResourceConfig(min)
if !scalarRes.Contains(scalarMin) {
return hostsvc.HostFilterResult_INSUFFICIENT_OFFER_RESOURCES
}
// Validates iff requested resource types are present on current host.
// It prevents a task to be launched on agent which has superset of requested resource types.
// As of now, supported scarce resource type is GPU.
for _, resourceType := range scarceResourceTypes {
if scalar.HasResourceType(scalarAgentRes, scalarMin, resourceType) {
return hostsvc.HostFilterResult_SCARCE_RESOURCES
}
}
}
// Match ports resources.
numPorts := c.GetResourceConstraint().GetNumPorts()
if numPorts > util.GetPortsNumFromOfferMap(offerMap) {
return hostsvc.HostFilterResult_INSUFFICIENT_OFFER_RESOURCES
}
// Only try to get first offer in this host because all the offers have
// the same host attributes.
var firstOffer *mesos.Offer
for _, offer := range offerMap {
firstOffer = offer
break
}
hostname := firstOffer.GetHostname()
hc := c.GetSchedulingConstraint()
return hmutil.MatchSchedulingConstraint(
hostname, labelValues, firstOffer.GetAttributes(), hc, evaluator)
}
// TryMatch atomically tries to match offers from the current host with given
// HostFilter.
// If current hostSummary is matched by given HostFilter, the first return
// value is true and unreserved offer status for this host will be marked as
// `PLACING`, which will not be used by another placement engine until released.
// If current instance is not matched by given HostFilter, return value will be
// (actual reason, empty-slice) and status will remain unchanged.
func (a *hostSummary) TryMatch(
filter *hostsvc.HostFilter,
evaluator constraints.Evaluator,
labelValues constraints.LabelValues) Match {
a.Lock()
defer a.Unlock()
if a.status != ReadyHost && a.status != HeldHost {
return Match{
Result: hostsvc.HostFilterResult_MISMATCH_STATUS,
}
}
if !a.HasOffer() {
return Match{
Result: hostsvc.HostFilterResult_NO_OFFER,
}
}
// for host in Held state, it is only a match if the filter
// hint contains the host
if a.status == HeldHost {
var hintFound bool
for _, hostHint := range filter.GetHint().GetHostHint() {
if hostHint.GetHostname() == a.hostname {
hintFound = true
break
}
}
if !hintFound {
return Match{
Result: hostsvc.HostFilterResult_MISMATCH_STATUS,
}
}
}
// Validates task affinity constraint for stateless workload
constraint := filter.GetSchedulingConstraint()
if constraint.GetType() == task.Constraint_LABEL_CONSTRAINT {
if !a.isHostLimitConstraintSatisfy(constraint.GetLabelConstraint()) {
return Match{Result: hostsvc.HostFilterResult_MISMATCH_CONSTRAINTS}
}
}
result := matchHostFilter(
a.unreservedOffers,
filter,
evaluator,
labelValues,
scalar.FromMesosResources(host.GetAgentInfo(a.GetHostname()).GetResources()),
a.scarceResourceTypes)
if result != hostsvc.HostFilterResult_MATCH {
return Match{Result: result}
}
// Its a match!
var offers []*mesos.Offer
for _, o := range a.unreservedOffers {
offer := proto.Clone(o).(*mesos.Offer)
if filter.GetResourceConstraint().GetRevocable() {
offer.Resources, _ = scalar.FilterMesosResources(
offer.GetResources(),
func(r *mesos.Resource) bool {
if r.GetRevocable() != nil {
return true
}
return !hmutil.IsSlackResourceType(r.GetName(), a.slackResourceTypes)
})
} else {
_, offer.Resources = scalar.FilterRevocableMesosResources(offer.GetResources())
}
offers = append(offers, offer)
}
// Setting status to `PlacingHost`: this ensures proper state
// tracking of resources on the host and also ensures offers on
// this host will not be sent to another `AcquireHostOffers`
// call before released.
err := a.casStatusLockFree(a.status, PlacingHost)
if err != nil {
return Match{
Result: hostsvc.HostFilterResult_NO_OFFER,
}
}
// Add offer to the match
return Match{
Result: hostsvc.HostFilterResult_MATCH,
Offer: &Offer{
ID: a.hostOfferID,
Offers: offers,
},
}
}
// hasLabeledReservedResources returns if given offer has labeled
// reserved resources.
func hasLabeledReservedResources(offer *mesos.Offer) bool {
for _, res := range offer.GetResources() {
if res.GetRole() != "" &&
res.GetRole() != unreservedRole &&
res.GetReservation().GetLabels() != nil {
return true
}
}
return false
}
// AddMesosOffers adds a Mesos offers to the current hostSummary and returns
// its status for tracking purpose.
func (a *hostSummary) AddMesosOffers(
ctx context.Context,
offers []*mesos.Offer) HostStatus {
a.Lock()
defer a.Unlock()
var offerIDs []string
for _, offer := range offers {
// filter out revocable resources whose type we don't recognize
offerID := offer.GetId().GetValue()
offer.Resources, _ = scalar.FilterMesosResources(
offer.Resources,
func(r *mesos.Resource) bool {
if r.GetRevocable() == nil {
return true
}
return hmutil.IsSlackResourceType(r.GetName(), a.slackResourceTypes)
})
if !hasLabeledReservedResources(offer) {
a.unreservedOffers[offerID] = offer
} else {
a.reservedOffers[offerID] = offer
}
offerIDs = append(offerIDs, offer.GetId().GetValue())
}
if a.status == ReadyHost || a.status == HeldHost {
a.readyCount.Store(int32(len(a.unreservedOffers)))
}
log.WithFields(log.Fields{
"offer_ids": strings.Join(offerIDs, ","),
"outstanding_offer_count": len(a.unreservedOffers),
"hostname": a.GetHostname(),
"host_state": a.status,
}).Info("offers added to host summary")
return a.status
}
// ClaimForLaunch atomically check that current hostSummary is in Placing
// status, release offers so caller can use them to launch tasks, and reset
// status to ready.
func (a *hostSummary) ClaimForLaunch(
hostOfferID string,
launchableTasks []*hostsvc.LaunchableTask,
taskIDs ...*peloton.TaskID) (map[string]*mesos.Offer,
error) {
a.Lock()
defer a.Unlock()
if a.status != PlacingHost {
return nil, errors.New("host status is not Placing")
}
if a.hostOfferID != hostOfferID {
return nil, errors.New("host offer id does not match")
}
result := make(map[string]*mesos.Offer)
result, a.unreservedOffers = a.unreservedOffers, result
for _, t := range launchableTasks {
taskID := t.GetTaskId().GetValue()
a.tasks[taskID] = &task.TaskInfo{
Config: t.GetConfig(),
Runtime: &task.RuntimeInfo{
State: task.TaskState_LAUNCHED,
StartTime: time.Now().Format(time.RFC3339Nano),
},
}
}
for _, taskID := range taskIDs {
a.releaseHoldForTaskLockFree(taskID)
}
newState := a.getResetStatus()
// Reset status to held/ready depending on if the host is held for
// other tasks.
if err := a.casStatusLockFree(PlacingHost, newState); err != nil {
return nil, errors.Wrap(err,
"failed to move host to Ready state")
}
return result, nil
}
// RemoveMesosOffer removes the given Mesos offer by its id, and returns
// CacheStatus and possibly removed offer for tracking purpose.
func (a *hostSummary) RemoveMesosOffer(offerID, reason string) (HostStatus, *mesos.Offer) {
a.Lock()
defer a.Unlock()
offer, ok := a.unreservedOffers[offerID]
if !ok {
offer, ok = a.reservedOffers[offerID]
if !ok {
log.WithFields(log.Fields{
"offer": offerID,
"reason": reason,
}).Warn("Remove non-existing reserved offer.")
return a.status, offer
}
delete(a.reservedOffers, offerID)
} else {
delete(a.unreservedOffers, offerID)
a.readyCount.Dec()
}
switch a.status {
case ReadyHost:
log.WithFields(log.Fields{
"offer": offerID,
"hostname": a.GetHostname(),
"reason": reason,
}).Info("Ready offer removed")
default:
// This could trigger INVALID_OFFER error later.
log.WithFields(log.Fields{
"offer_id": offer.Id.GetValue(),
"resources_removed": scalar.FromOffer(offer),
"unreserved_resources_left": scalar.FromOfferMap(a.unreservedOffers),
"status": a.status,
"reason": reason,
}).Warn("offer removed while not in ready status")
}
return a.status, offer
}
// CasStatus atomically sets the status to new value if current value is old,
// otherwise returns error.
func (a *hostSummary) CasStatus(old, new HostStatus) error {
a.Lock()
defer a.Unlock()
return a.casStatusLockFree(old, new)
}
// Notify the client whenever there is a change in host summary object
func (a *hostSummary) notifyEvent() {
a.watchProcessor.NotifyEventChange(a.createHostSummaryObject())
}
// casStatus atomically and lock-freely sets the status to new value
// if current value is old, otherwise returns error. This should wrapped
// around locking
func (a *hostSummary) casStatusLockFree(old, new HostStatus) error {
defer a.notifyEvent()
if a.status != old {
return InvalidHostStatus{a.status}
}
a.status = new
switch a.status {
case ReadyHost:
// if its a ready host then reset the hostOfferID
a.hostOfferID = emptyOfferID
a.readyCount.Store(int32(len(a.unreservedOffers)))
case PlacingHost:
// generate the offer id for a placing host.
a.hostOfferID = a.offerIDgenerator()
a.statusPlacingOfferExpiration = time.Now().Add(a.hostPlacingOfferStatusTimeout)
a.readyCount.Store(0)
case ReservedHost:
// generate the offer id for a placing host.
a.hostOfferID = a.offerIDgenerator()
a.readyCount.Store(0)
case HeldHost:
a.hostOfferID = emptyOfferID
a.readyCount.Store(int32(len(a.unreservedOffers)))
}
return nil
}
// UnreservedAmount returns unreserved non-revocable and revocable resources
// and current host status
func (a *hostSummary) UnreservedAmount() (scalar.Resources, scalar.Resources, HostStatus) {
a.Lock()
defer a.Unlock()
unreservedResources := scalar.FromOffersMapToMesosResources(a.unreservedOffers)
revocable, nonRevocable := scalar.FilterRevocableMesosResources(unreservedResources)
return scalar.FromMesosResources(nonRevocable),
scalar.FromMesosResources(revocable),
a.status
}
// ResetExpiredPlacingOfferStatus resets a hostSummary status from PlacingOffer
// to ReadyOffer if the PlacingOffer status has expired, and returns
// whether the hostSummary got reset to READY/HELD
func (a *hostSummary) ResetExpiredPlacingOfferStatus(now time.Time) (bool, scalar.Resources, []*peloton.TaskID) {
a.Lock()
defer a.Unlock()
var taskExpired []*peloton.TaskID
if !a.HasOffer() &&
a.status == PlacingHost &&
now.After(a.statusPlacingOfferExpiration) {
// some tasks held on the host may expire during the time,
// expire the tasks and calculate the new status
taskExpired = a.releaseExpiredHeldTask(now)
newStatus := a.getResetStatus()
log.WithFields(log.Fields{
"time": now,
"current_status": a.status,
"new_status": newStatus,
"ready_count": a.readyCount.Load(),
"offer_resources": scalar.FromOfferMap(a.unreservedOffers),
"task_expired": taskExpired,
}).Warn("reset host from placing state after timeout")
a.casStatusLockFree(PlacingHost, newStatus)
return true, scalar.FromOfferMap(a.unreservedOffers), taskExpired
}
return false, scalar.Resources{}, taskExpired
}
// ResetExpiredHostHeldStatus resets a hostSummary status from HeldHost
// to ReadyHost if the HeldHost status has expired, and returns
// whether the hostSummary got reset to READY/HELD
func (a *hostSummary) ResetExpiredHostHeldStatus(now time.Time) (bool, scalar.Resources, []*peloton.TaskID) {
a.Lock()
defer a.Unlock()
var taskExpired []*peloton.TaskID
taskExpired = a.releaseExpiredHeldTask(now)
// keep the host status if it is in PLACING/Reserved
if len(taskExpired) != 0 && a.status == HeldHost {
newStatus := a.getResetStatus()
log.WithFields(log.Fields{
"time": now,
"current_status": a.status,
"new_status": newStatus,
"ready_count": a.readyCount.Load(),
"offer_resources": scalar.FromOfferMap(a.unreservedOffers),
"task_expired": taskExpired,
}).Warn("remove expired task for host in held status after timeout")
a.casStatusLockFree(HeldHost, newStatus)
// if host is still in held state, then no resource is freed
res := scalar.Resources{}
if newStatus != HeldHost {
res = scalar.FromOfferMap(a.unreservedOffers)
}
return true, res, taskExpired
}
// reset happens, but host state is not changed,
// so no res is released
if len(taskExpired) != 0 {
return true, scalar.Resources{}, taskExpired
}
return false, scalar.Resources{}, taskExpired
}
func (a *hostSummary) releaseExpiredHeldTask(now time.Time) []*peloton.TaskID {
var taskExpired []*peloton.TaskID
for taskID, expirationTime := range a.heldTasks {
if now.After(expirationTime) {
a.releaseHoldForTaskLockFree(&peloton.TaskID{Value: taskID})
taskExpired = append(taskExpired, &peloton.TaskID{Value: taskID})
}
}
return taskExpired
}
// GetOffers returns offers, and #offers present for this host, of type reserved, unreserved or all.
// Returns map of offerid -> offer
func (a *hostSummary) GetOffers(offertype OfferType) map[string]*mesos.Offer {
a.Lock()
defer a.Unlock()
return a.getOffers(offertype)
}
// getOffers is a unprotected method
// returns offers, and #offers present for this host, of type
// reserved, unreserved or all.
// Returns map of offerid -> offer
func (a *hostSummary) getOffers(offertype OfferType) map[string]*mesos.Offer {
offers := make(map[string]*mesos.Offer)
switch offertype {
case Reserved:
for offerID, offer := range a.reservedOffers {
offers[offerID] = proto.Clone(offer).(*mesos.Offer)
}
break
case Unreserved:
for offerID, offer := range a.unreservedOffers {
offers[offerID] = proto.Clone(offer).(*mesos.Offer)
}
break
case All:
fallthrough
default:
offers = a.getOffers(Unreserved)
reservedOffers := a.getOffers(Reserved)
for key, value := range reservedOffers {
offers[key] = value
}
break
}
return offers
}
// GetHostname returns the hostname of the host
func (a *hostSummary) GetHostname() string {
return a.hostname
}
// GetHostStatus returns the HostStatus of the host
func (a *hostSummary) GetHostStatus() HostStatus {
a.Lock()
defer a.Unlock()
return a.status
}
// GetHostOfferID returns the hostOffID of the host
func (a *hostSummary) GetHostOfferID() string {
return a.hostOfferID
}
// HoldForTasks holds the host for the task specified
func (a *hostSummary) HoldForTask(id *peloton.TaskID) error {
a.Lock()
defer a.Unlock()
if a.status == ReservedHost {
return errors.Wrap(&InvalidHostStatus{status: a.status},
"cannot change host state to Held")
}
if a.status == ReadyHost {
if err := a.casStatusLockFree(ReadyHost, HeldHost); err != nil {
return err
}
}
// for PLACING and HELD, state no need to change host state
if _, ok := a.heldTasks[id.GetValue()]; !ok {
a.heldTasks[id.GetValue()] = time.Now().Add(hostHeldStatusTimeout)
}
log.WithFields(log.Fields{
"hostname": a.hostname,
"status": a.status,
"host_held": a.heldTasks,
"task_id": id.GetValue(),
}).Debug("hold task")
return nil
}
// GetHeldTask returns a slice of task that puts the host in held
func (a *hostSummary) GetHeldTask() []*peloton.TaskID {
a.Lock()
defer a.Unlock()
var result []*peloton.TaskID
for taskID := range a.heldTasks {
result = append(result, &peloton.TaskID{Value: taskID})
}
return result
}
// ReleaseHoldForTasks release the hold of host for the task specified
func (a *hostSummary) ReleaseHoldForTask(id *peloton.TaskID) error {
a.Lock()
defer a.Unlock()
// try to reset the host status iff the task to be released
// is the only task left in held
if len(a.heldTasks) == 1 && a.status == HeldHost {
if _, ok := a.heldTasks[id.GetValue()]; ok {
log.WithFields(log.Fields{
"hostname": a.hostname,
"task_id": id,
}).Debug("host is ready after hold release")
if err := a.casStatusLockFree(a.status, ReadyHost); err != nil {
return err
}
}
}
a.releaseHoldForTaskLockFree(id)
return nil
}
func (a *hostSummary) releaseHoldForTaskLockFree(id *peloton.TaskID) {
if _, exist := a.heldTasks[id.GetValue()]; !exist {
// this can happen for various reasons such as
// a task is launched again on the same host after timeout
log.WithFields(log.Fields{
"hostname": a.hostname,
"task_id": id.GetValue(),
}).Info("task held is not found on host")
} else {
delete(a.heldTasks, id.GetValue())
}
log.WithFields(log.Fields{
"hostname": a.hostname,
"status": a.status,
"host_held": a.heldTasks,
"task_id": id.GetValue(),
}).Debug("release task")
}
// ReturnPlacingHost is called when the host in PLACING state is not used,
// and is returned by placement engine
func (a *hostSummary) ReturnPlacingHost() error {
a.Lock()
defer a.Unlock()
if a.status != PlacingHost {
return &InvalidHostStatus{status: a.status}
}
newStatus := a.getResetStatus()
log.WithFields(log.Fields{
"new_status": newStatus,
"held_tasks": a.heldTasks,
}).Debug("return placing hosts")
return a.casStatusLockFree(PlacingHost, newStatus)
}
// getResetStatus returns the new host status for a host
// that is going to be reset from PLACING/HELD state.
func (a *hostSummary) getResetStatus() HostStatus {
newStatus := ReadyHost
if len(a.heldTasks) != 0 {
newStatus = HeldHost
}
return newStatus
}
// create host summary object
func (a *hostSummary) createHostSummaryObject() *halphapb.HostSummary {
obj := &halphapb.HostSummary{
Hostname: a.GetHostname(),
Offers: a.unreservedOffers,
}
return obj
}