pkg/hostmgr/offer/offerpool/pool.go (624 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package offerpool import ( "context" "reflect" "sync" "time" mesos "github.com/uber/peloton/.gen/mesos/v1" sched "github.com/uber/peloton/.gen/mesos/v1/scheduler" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" "github.com/uber/peloton/.gen/peloton/api/v0/task" "github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/constraints" "github.com/uber/peloton/pkg/common/util" "github.com/uber/peloton/pkg/hostmgr/binpacking" "github.com/uber/peloton/pkg/hostmgr/hostpool/manager" hostmgr_mesos "github.com/uber/peloton/pkg/hostmgr/mesos" "github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/encoding/mpb" "github.com/uber/peloton/pkg/hostmgr/scalar" "github.com/uber/peloton/pkg/hostmgr/summary" "github.com/uber/peloton/pkg/hostmgr/watchevent" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "go.uber.org/multierr" ) // Pool caches a set of offers received from Mesos master. It is // currently only instantiated at the leader of Peloton masters. type Pool interface { // Add offers to the pool. Filters out non-viable offers (offers with // non-nil Unavailability) and returns slice of acceptable offers. AddOffers(context.Context, []*mesos.Offer) []*mesos.Offer // Rescind a offer from the pool, on Mesos Master --offer-timeout // Returns whether the offer is found in the pool. RescindOffer(*mesos.OfferID) bool // UpdateTasksOnHost updates the task to host map for host summary. UpdateTasksOnHost(taskID string, taskState task.TaskState, taskInfo *task.TaskInfo) // RemoveExpiredOffers, prunes offers from the pool, when offer-hold-time // is expired. RemoveExpiredOffers() (map[string]*TimedOffer, int) // Clear all offers in the pool Clear() // Decline offers, sends Mesos Master decline call and removes from offer // pool. DeclineOffers(ctx context.Context, offerIds []*mesos.OfferID) error // ClaimForPlace obtains offers from pool conforming to given HostFilter // for placement purposes. // First return value is returned offers, grouped by hostname as key, // Second return value is a map from hostsvc.HostFilterResult to count. ClaimForPlace( ctx context.Context, constraint *hostsvc.HostFilter) ( map[string]*summary.Offer, map[string]uint32, error) // ClaimForLaunch finds offers previously for placement on given host. // The difference from ClaimForPlace is that offers claimed from this // function are considered used and sent back to Mesos master in a Launch // operation, while result in `ClaimForPlace` are still considered part // of peloton apps. // An optional list of task ids is provided if the host is held for // the tasks ClaimForLaunch( hostname string, hostOfferID string, launchableTasks []*hostsvc.LaunchableTask, taskIDs ...*peloton.TaskID) (map[string]*mesos.Offer, error) // ReturnUnusedOffers returns previously placed offers on hostname back // to current offer pool so they can be used by future launch actions. ReturnUnusedOffers(hostname string) error // TODO: Add following API for viewing offers, and optionally expose // this in a debugging endpoint. // View() (map[string][]*mesos.Offer, err) // ResetExpiredPlacingHostSummaries resets the status of each hostSummary of the // offerPool from PlacingOffer to ReadyOffer if the PlacingOffer status has // expired and returns the hostnames which got reset ResetExpiredPlacingHostSummaries(now time.Time) []string // ResetExpiredHeldHostSummaries resets the status of each hostSummary of the // offerPool from HeldHost to ReadyHost if the HeldHost status has // expired and returns the hostnames which got reset ResetExpiredHeldHostSummaries(now time.Time) []string // GetAllOffers returns hostOffers : map[hostname] -> map(offerid -> offers // & #offers for reserved, unreserved or all offer type. GetAllOffers() (map[string]map[string]*mesos.Offer, int) // RefreshGaugeMaps refreshes ready/placing metrics from all hosts. RefreshGaugeMaps() // GetHostSummary returns the host summary object for the given host name GetHostSummary(hostName string) (summary.HostSummary, error) // GetBinPackingRanker returns the associated ranker with the offer pool GetBinPackingRanker() binpacking.Ranker // GetHostOfferIndex returns the host to host summary mapping // it makes the copy and returns the new map GetHostOfferIndex() map[string]summary.HostSummary // GetHostSummaries returns a map of hostname to host summary object GetHostSummaries(hostnames []string) (map[string]summary.HostSummary, error) // GetHostHeldForTask returns the host that is held for the task GetHostHeldForTask(taskID *peloton.TaskID) string // HoldForTasks holds the host for the tasks specified HoldForTasks(hostname string, taskIDs []*peloton.TaskID) error // ReleaseHoldForTasks release the hold of host for the tasks specified ReleaseHoldForTasks(hostname string, taskIDs []*peloton.TaskID) error // SetHostPoolManager set host pool manager in the offer pool. SetHostPoolManager(manager manager.HostPoolManager) } const ( // Reject offers from unavailable/maintenance host only before 3 hour of // starting window. // Mesos Master sets unix nano seconds for unavailability start time. _defaultRejectUnavailableOffer = int64(10800000000000) ) var ( // supportedScarceResourceTypes are resources types supported to launch // scarce resource tasks, exclusively on scarce resource type hosts. //ToDo: move to lower case (match mesos resource type string) supportedScarceResourceTypes = []string{"GPU"} // supportedSlackResourceTypes are slack resource types supported by Peloton. supportedSlackResourceTypes = []string{common.MesosCPU} ) // NewOfferPool creates a offerPool object and registers the // corresponding YARPC procedures. func NewOfferPool( offerHoldTime time.Duration, schedulerClient mpb.SchedulerClient, metrics *Metrics, frameworkInfoProvider hostmgr_mesos.FrameworkInfoProvider, scarceResourceTypes []string, slackResourceTypes []string, binPackingRanker binpacking.Ranker, hostPlacingOfferStatusTimeout time.Duration, processor watchevent.WatchProcessor, hostPoolManager manager.HostPoolManager) Pool { // GPU is only supported scarce resource type. if !reflect.DeepEqual(supportedScarceResourceTypes, scarceResourceTypes) { log.WithFields(log.Fields{ "supported_scarce_resource_types": supportedScarceResourceTypes, "scarce_resource_types": scarceResourceTypes, }).Error("input scarce resources types are not supported") } // cpus is only supported scarce resource type. if !reflect.DeepEqual(supportedSlackResourceTypes, slackResourceTypes) { log.WithFields(log.Fields{ "supported_slack_resource_types": supportedSlackResourceTypes, "slack_resource_types": slackResourceTypes, }).Error("input slack resources types are not supported") } p := &offerPool{ hostOfferIndex: make(map[string]summary.HostSummary), scarceResourceTypes: scarceResourceTypes, slackResourceTypes: slackResourceTypes, offerHoldTime: offerHoldTime, hostPlacingOfferStatusTimeout: hostPlacingOfferStatusTimeout, mSchedulerClient: schedulerClient, mesosFrameworkInfoProvider: frameworkInfoProvider, metrics: metrics, binPackingRanker: binPackingRanker, watchProcessor: processor, hostPoolManager: hostPoolManager, } return p } // TimedOffer contains hostname and possible expiration time of an offer. type TimedOffer struct { Hostname string Expiration time.Time } type offerPool struct { sync.RWMutex // hostOfferIndex -- key: hostname, value: HostSummary hostOfferIndex map[string]summary.HostSummary // Inverse index from task to hostname. This map is required // because event stream does not have hostname. // key: taskID, value: hostname taskToHostMap sync.Map // scarce resource types, such as GPU. scarceResourceTypes []string // slack resource types, represents usage slace. // cpus is only supported slack resource. slackResourceTypes []string // Map from offer id to hostname and offer expiration time. // Used when offer is rescinded or pruned. timedOffers sync.Map // Time to hold offer in offer pool offerHoldTime time.Duration // Time to hold host in PLACING state hostPlacingOfferStatusTimeout time.Duration mSchedulerClient mpb.SchedulerClient mesosFrameworkInfoProvider hostmgr_mesos.FrameworkInfoProvider metrics *Metrics // The default ranker for hosts during filtering binPackingRanker binpacking.Ranker // taskHeldIndex --- key: task id, // value: host held for the task taskHeldIndex sync.Map watchProcessor watchevent.WatchProcessor hostPoolManager manager.HostPoolManager } // ClaimForPlace obtains offers from pool conforming to given constraints. // Results are grouped by hostname as key. Here, offers are not cleared from // the offer pool. First return value is returned offers, grouped by hostname // as key, Second return value is a map from hostsvc.HostFilterResult to count. func (p *offerPool) ClaimForPlace( ctx context.Context, hostFilter *hostsvc.HostFilter, ) ( map[string]*summary.Offer, map[string]uint32, error) { p.RLock() defer p.RUnlock() matcher := NewMatcher( hostFilter, constraints.NewEvaluator(task.LabelConstraint_HOST), p.hostPoolManager) // if host hint is provided, try to return the hosts in hints first for _, filterHints := range hostFilter.GetHint().GetHostHint() { if hs, ok := p.hostOfferIndex[filterHints.GetHostname()]; ok { if result := matcher.tryMatch(hs); result != hostsvc.HostFilterResult_MATCH { log.WithField("task_id", filterHints.GetTaskID().GetValue()). WithField("hostname", hs.GetHostname()). WithField("match_result", result.String()). Info("failed to match task with desired host") } if matcher.HasEnoughHosts() { break } } } // We might want to consider making it a aynchronous process if // this becomes bottleneck, but that might increase the defragmentation // in the cluster, will start with this approach and monitor it based // on the results we will optimize this. var sortedSummaryList []interface{} if !matcher.HasEnoughHosts() { sortedSummaryList = p.getRankedHostSummaryList( ctx, hostFilter.GetHint().GetRankHint(), p.hostOfferIndex, ) } for _, s := range sortedSummaryList { // if case the ordered list contains nil val if s != nil { matcher.tryMatch(s.(summary.HostSummary)) if matcher.HasEnoughHosts() { break } } } hasEnoughHosts := matcher.HasEnoughHosts() hostOffers, resultCount := matcher.getHostOffers() if !hasEnoughHosts { // Still proceed to return something. log.WithFields(log.Fields{ "host_filter": hostFilter, "matched_host_offers_noindex": hostOffers, "match_result_counts": resultCount, }).Debug("Not enough offers are matched to given constraints") } // NOTE: we should not clear the entries for the selected offers in p.offers // because we still need to visit corresponding offers, when these offers // are returned or used. return hostOffers, resultCount, nil } func (p *offerPool) getRankedHostSummaryList( ctx context.Context, rankHint hostsvc.FilterHint_Ranking, offerIndex map[string]summary.HostSummary, ) []interface{} { ranker := p.binPackingRanker switch rankHint { case hostsvc.FilterHint_FILTER_HINT_RANKING_RANDOM: // FirstFit ranker iterates over the offerIndex map and returns the // summary for each entry. We rely on Golang's randomized map // iteration order to get a random sequence of host summaries. ranker = binpacking.GetRankerByName(binpacking.FirstFit) case hostsvc.FilterHint_FILTER_HINT_RANKING_LEAST_AVAILABLE_FIRST: // DeFrag orders hosts from lowest offered resources to most // offered resources ranker = binpacking.GetRankerByName(binpacking.DeFrag) case hostsvc.FilterHint_FILTER_HINT_RANKING_LOAD_AWARE: // Load aware orders hosts from lowest loaded to highest loaded ranker = binpacking.GetRankerByName(binpacking.LoadAware) } return ranker.GetRankedHostList(ctx, offerIndex) } // ClaimForLaunch takes offers from pool (removes from hostsummary) for launch. func (p *offerPool) ClaimForLaunch( hostname string, hostOfferID string, launchableTasks []*hostsvc.LaunchableTask, taskIDs ...*peloton.TaskID, ) (map[string]*mesos.Offer, error) { p.RLock() defer p.RUnlock() var offerMap map[string]*mesos.Offer var err error hs, ok := p.hostOfferIndex[hostname] if !ok { return nil, errors.New("cannot find input hostname " + hostname) } offerMap, err = hs.ClaimForLaunch(hostOfferID, launchableTasks, taskIDs...) if err != nil { return nil, err } if len(offerMap) == 0 { return nil, errors.New("no offer found to launch task on " + hostname) } for id := range offerMap { if _, ok := p.timedOffers.Load(id); ok { // Remove offer from the offerid -> hostname map. p.timedOffers.Delete(id) } else { log.WithFields(log.Fields{ "offer_id": id, "host": hostname, }).Warn("ClaimForLaunch: OfferID not found in pool.") } } for _, taskID := range taskIDs { p.removeTaskHold(hostname, taskID) } for _, launchableTask := range launchableTasks { p.addTaskToHost(launchableTask.GetTaskId().GetValue(), hostname) } return offerMap, nil } // validateOfferUnavailability for incoming offer. // Reject an offer if maintenance start time is less than current time. // Reject an offer if current time is less than 3 hours to maintenance start time. // Accept an offer if current time is more than 3 hours to maintenance start time. func validateOfferUnavailability(offer *mesos.Offer) bool { if offer.GetUnavailability() != nil { currentTime := time.Now().UnixNano() unavailabilityStartTime := offer.Unavailability.Start.GetNanoseconds() if (unavailabilityStartTime-currentTime > 0 && unavailabilityStartTime-currentTime < _defaultRejectUnavailableOffer) || (currentTime-unavailabilityStartTime >= 0) { return true } } return false } // AddOffers is a callback event when Mesos Master sends offers. func (p *offerPool) AddOffers( ctx context.Context, offers []*mesos.Offer) []*mesos.Offer { var acceptableOffers []*mesos.Offer var unavailableOffers []*mesos.OfferID hostnameToOffers := make(map[string][]*mesos.Offer) for _, offer := range offers { if validateOfferUnavailability(offer) { unavailableOffers = append(unavailableOffers, offer.Id) continue } p.timedOffers.Store(offer.Id.GetValue(), &TimedOffer{ Hostname: offer.GetHostname(), Expiration: time.Now().Add(p.offerHoldTime), }) oldOffers := hostnameToOffers[offer.GetHostname()] hostnameToOffers[offer.GetHostname()] = append(oldOffers, offer) acceptableOffers = append(acceptableOffers, offer) } p.metrics.UnavailableOffers.Inc(int64(len(unavailableOffers))) p.metrics.AcceptableOffers.Inc(int64(len(acceptableOffers))) // Decline unavailable offers. if len(unavailableOffers) > 0 { log. WithField("unavailable_offers", unavailableOffers). Debug("Offer unavailable due to maintenance on these hosts.") p.DeclineOffers(ctx, unavailableOffers) } log. WithField("acceptable_offers", acceptableOffers). Debug("Acceptable offers.") p.Lock() for hostname := range hostnameToOffers { p.addHostSummary(hostname) } p.Unlock() var wg sync.WaitGroup for hostname, offers := range hostnameToOffers { wg.Add(1) go func(hostname string, offers []*mesos.Offer) { defer wg.Done() p.RLock() p.hostOfferIndex[hostname].AddMesosOffers(ctx, offers) p.RUnlock() }(hostname, offers) } wg.Wait() return acceptableOffers } // addHostSummary is helper function to create HostSummary for // provided hostname if not exists. func (p *offerPool) addHostSummary(hostname string) { _, ok := p.hostOfferIndex[hostname] if !ok { hs := summary.New( p.scarceResourceTypes, hostname, p.slackResourceTypes, p.hostPlacingOfferStatusTimeout, p.watchProcessor) p.hostOfferIndex[hostname] = hs } } // UpdateTasksOnHost updates tasks assigned or running on a host. // Task is assigned first time on recovery or placement. // On receiving non-terminal event, task state is updated and on // receiving terminal event, task is removed from the host. func (p *offerPool) UpdateTasksOnHost( taskID string, taskState task.TaskState, taskInfo *task.TaskInfo) { p.Lock() defer p.Unlock() var hostname string // on recovery, taskinfo is non-nil if taskInfo != nil { hostname = taskInfo.GetRuntime().GetHost() if len(hostname) == 0 || util.IsPelotonStateTerminal(taskState) { return } p.addHostSummary(hostname) p.addTaskToHost(taskID, hostname) } else { // on receving mesos task status update event else is evaluated. value, ok := p.taskToHostMap.Load(taskID) if !ok { return } hostname = value.(string) } p.hostOfferIndex[hostname].UpdateTasksOnHost(taskID, taskState, taskInfo) if util.IsPelotonStateTerminal(taskState) { p.removeTaskToHost(taskID) } } // removeOffer is a helper method to remove an offer from timedOffers and // hostSummary. func (p *offerPool) removeOffer(offerID, reason string) { offer, ok := p.timedOffers.Load(offerID) if !ok { log. WithField("offer_id", offerID). Info("offer not found in pool.") return } p.timedOffers.Delete(offerID) // Remove offer from hostOffers hostName := offer.(*TimedOffer).Hostname hostOffers, ok := p.hostOfferIndex[hostName] if !ok { log.WithFields(log.Fields{ "host": hostName, "offer_id": offerID, }).Warn("host not found in hostOfferIndex") } else { hostOffers.RemoveMesosOffer(offerID, reason) } } // RescindOffer is a callback event when Mesos Master rescinds a offer. // Reasons for offer rescind can be lost agent, offer_timeout and more. func (p *offerPool) RescindOffer(offerID *mesos.OfferID) bool { p.RLock() defer p.RUnlock() oID := *offerID.Value p.metrics.RescindEvents.Inc(1) p.removeOffer(oID, "offer is rescinded.") return true } // RemoveExpiredOffers removes offers which are expired from pool // and return the list of removed mesos offer ids and their location as well as // how many valid offers are still left. func (p *offerPool) RemoveExpiredOffers() (map[string]*TimedOffer, int) { p.RLock() defer p.RUnlock() offersToDecline := map[string]*TimedOffer{} p.timedOffers.Range(func(offerID, timedOffer interface{}) bool { if time.Now().After(timedOffer.(*TimedOffer).Expiration) { log. WithField("offer_id", offerID). Info("Removing expired offer from pool.") offersToDecline[offerID.(string)] = timedOffer.(*TimedOffer) } return true }) // Remove the expired offers from hostOfferIndex if len(offersToDecline) > 0 { p.metrics.ExpiredOffers.Inc(int64(len(offersToDecline))) for offerID := range offersToDecline { p.removeOffer(offerID, "offer is expired.") } } length := 0 p.timedOffers.Range(func(_, _ interface{}) bool { length++ return true }) return offersToDecline, length } // Clear removes all offers from pool. func (p *offerPool) Clear() { p.RLock() defer p.RUnlock() log.Info("Clean up offerpool.") p.timedOffers.Range(func(key interface{}, value interface{}) bool { p.timedOffers.Delete(key) return true }) p.hostOfferIndex = map[string]summary.HostSummary{} } // DeclineOffers calls mesos master to decline list of offers func (p *offerPool) DeclineOffers( ctx context.Context, offerIDs []*mesos.OfferID) error { p.RLock() defer p.RUnlock() callType := sched.Call_DECLINE msg := &sched.Call{ FrameworkId: p.mesosFrameworkInfoProvider.GetFrameworkID(ctx), Type: &callType, Decline: &sched.Call_Decline{ OfferIds: offerIDs, }, } msid := p.mesosFrameworkInfoProvider.GetMesosStreamID(ctx) err := p.mSchedulerClient.Call(msid, msg) if err != nil { // Ideally, we assume that Mesos has offer_timeout configured, // so in the event that offer declining call fails, offers // should eventually be invalidated by Mesos. log.WithError(err). WithField("call", msg). Warn("Failed to decline offers.") p.metrics.DeclineFail.Inc(1) return err } p.metrics.Decline.Inc(int64(len(offerIDs))) for _, offerID := range offerIDs { p.removeOffer(*offerID.Value, "offer is declined") } return nil } // ReturnUnusedOffers returns resources previously sent to placement engine // back to ready state. func (p *offerPool) ReturnUnusedOffers(hostname string) error { p.RLock() defer p.RUnlock() hostOffers, ok := p.hostOfferIndex[hostname] if !ok { log.WithField("host", hostname). Warn("Offers returned to pool but not found, maybe pruned?") return nil } err := hostOffers.ReturnPlacingHost() if err != nil { return err } p.metrics.ReturnUnusedHosts.Inc(1) return nil } // ResetExpiredPlacingHostSummaries resets the status of each hostSummary of the // offerPool from PlacingOffer to ReadyOffer if the PlacingOffer status has // expired and returns the hostnames which got reset func (p *offerPool) ResetExpiredPlacingHostSummaries(now time.Time) []string { p.RLock() defer p.RUnlock() var resetHostnames []string for hostname, summ := range p.hostOfferIndex { if reset, res, taskExpired := summ.ResetExpiredPlacingOfferStatus(now); reset { resetHostnames = append(resetHostnames, hostname) for _, task := range taskExpired { p.removeTaskHold(hostname, task) } p.metrics.ResetExpiredPlacingHosts.Inc(1) log.WithFields(log.Fields{ "host": hostname, "summary": summ, "delta": res, }).Info("reset expired host summaries in PLACING state.") } } return resetHostnames } // ResetExpiredHeldHostSummaries resets the status of each hostSummary of the // offerPool from HeldHost to ReadyOffer if the PlacingOffer status has // expired and returns the hostnames which got reset func (p *offerPool) ResetExpiredHeldHostSummaries(now time.Time) []string { p.RLock() defer p.RUnlock() var resetHostnames []string for hostname, summ := range p.hostOfferIndex { reset, res, taskExpired := summ.ResetExpiredHostHeldStatus(now) if reset { resetHostnames = append(resetHostnames, hostname) p.metrics.ResetExpiredHeldHosts.Inc(1) log.WithFields(log.Fields{ "host": hostname, "summary": summ, "delta": res, }).Info("reset expired host summaries in HELD state.") } for _, task := range taskExpired { p.removeTaskHold(hostname, task) } } return resetHostnames } // GetAllOffers returns all hostOffers in the pool as: // map[hostname] -> map(offerid -> offers // and #offers for reserved, unreserved or all offer types. func (p *offerPool) GetAllOffers() (map[string]map[string]*mesos.Offer, int) { p.RLock() defer p.RUnlock() hostOffers := make(map[string]map[string]*mesos.Offer) var offersCount int for hostname, s := range p.hostOfferIndex { offers := s.GetOffers(summary.All) hostOffers[hostname] = offers offersCount += len(offers) } return hostOffers, offersCount } // RefreshGaugeMaps refreshes the metrics for hosts in ready and placing state. func (p *offerPool) RefreshGaugeMaps() { p.RLock() defer p.RUnlock() ready := scalar.Resources{} readyRevocable := scalar.Resources{} readyHosts := float64(0) placing := scalar.Resources{} placingRevocable := scalar.Resources{} placingHosts := float64(0) for _, h := range p.hostOfferIndex { nonRevocableAmount, revocableAmount, status := h.UnreservedAmount() switch status { case summary.ReadyHost: ready = ready.Add(nonRevocableAmount) readyRevocable = readyRevocable.Add(revocableAmount) readyHosts++ case summary.PlacingHost: placing = placing.Add(nonRevocableAmount) placingRevocable = placingRevocable.Add(revocableAmount) placingHosts++ } } p.metrics.Ready.Update(ready) p.metrics.ReadyRevocable.Update(readyRevocable) p.metrics.ReadyHosts.Update(readyHosts) p.metrics.Placing.Update(placing) p.metrics.PlacingRevocable.Update(placingRevocable) p.metrics.PlacingHosts.Update(placingHosts) p.metrics.AvailableHosts.Update(readyHosts + placingHosts) } // GetHostSummary returns the host summary object for the given host name func (p *offerPool) GetHostSummary(hostname string) (summary.HostSummary, error) { p.RLock() defer p.RUnlock() if _, ok := p.hostOfferIndex[hostname]; !ok { return nil, errors.Errorf("hostname %s does not have any offers", hostname) } return p.hostOfferIndex[hostname], nil } // GetBinPackingRanker returns the associated ranker with the offer pool func (p *offerPool) GetBinPackingRanker() binpacking.Ranker { return p.binPackingRanker } // GetHostOfferIndex returns the host to host summary mapping // it makes the copy and returns the new map func (p *offerPool) GetHostOfferIndex() map[string]summary.HostSummary { p.RLock() defer p.RUnlock() dest := make(map[string]summary.HostSummary) for k, v := range p.hostOfferIndex { dest[k] = v } return dest } // GetHostSummaries returns a map of hostname to host summary object func (p *offerPool) GetHostSummaries( hostnames []string) (map[string]summary.HostSummary, error) { p.RLock() defer p.RUnlock() hostSummaries := make(map[string]summary.HostSummary) if len(hostnames) > 0 { for _, hostname := range hostnames { if offerSummary, ok := p.hostOfferIndex[hostname]; ok { hostSummaries[hostname] = offerSummary } } } else { for hostname, offerSummary := range p.hostOfferIndex { hostSummaries[hostname] = offerSummary } } return hostSummaries, nil } // GetHostHeldForTask returns the host that is held for the task func (p *offerPool) GetHostHeldForTask(taskID *peloton.TaskID) string { val, ok := p.taskHeldIndex.Load(taskID.GetValue()) if !ok { return "" } return val.(string) } // HoldForTasks holds the host for the task specified func (p *offerPool) HoldForTasks(hostname string, taskIDs []*peloton.TaskID) error { hs, err := p.GetHostSummary(hostname) if err != nil { return err } var errs []error for _, taskID := range taskIDs { if err := hs.HoldForTask(taskID); err != nil { errs = append(errs, err) } else { p.addTaskHold(hostname, taskID) } } if len(errs) != 0 { return multierr.Combine(errs...) } return nil } // ReleaseHoldForTasks release the hold of host for the tasks specified func (p *offerPool) ReleaseHoldForTasks(hostname string, taskIDs []*peloton.TaskID) error { hs, err := p.GetHostSummary(hostname) if err != nil { return err } var errs []error for _, taskID := range taskIDs { if err := hs.ReleaseHoldForTask(taskID); err != nil { errs = append(errs, err) } else { p.removeTaskHold(hostname, taskID) } } if len(errs) != 0 { return multierr.Combine(errs...) } return nil } // SetHostPoolManager set host pool manager in the offer pool. func (p *offerPool) SetHostPoolManager(manager manager.HostPoolManager) { p.hostPoolManager = manager } // addTaskHold update the index when a host is held for a task func (p *offerPool) addTaskHold(hostname string, id *peloton.TaskID) { oldHost, loaded := p.taskHeldIndex.LoadOrStore(id.GetValue(), hostname) if loaded && oldHost != hostname { log.WithFields(log.Fields{ "new_host": hostname, "old_host": oldHost.(string), "task_id": id.GetValue(), }).Warn("task is held by multiple hosts") p.taskHeldIndex.Store(id.GetValue(), hostname) } } // removeTaskHold update the index when a host is removed from // held for a task func (p *offerPool) removeTaskHold(hostname string, id *peloton.TaskID) { p.taskHeldIndex.Delete(id.GetValue()) } func (p *offerPool) addTaskToHost(taskID, hostname string) { p.taskToHostMap.LoadOrStore(taskID, hostname) } func (p *offerPool) removeTaskToHost(taskID string) { p.taskToHostMap.Delete(taskID) }