pkg/hostmgr/p2k/hostcache/hostcache.go (513 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package hostcache import ( "sync" "time" "github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton" pbpod "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod" "github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc" hostmgr "github.com/uber/peloton/.gen/peloton/private/hostmgr/v1alpha" "github.com/uber/peloton/pkg/common/api" "github.com/uber/peloton/pkg/common/background" "github.com/uber/peloton/pkg/common/lifecycle" "github.com/uber/peloton/pkg/common/util" "github.com/uber/peloton/pkg/hostmgr/models" "github.com/uber/peloton/pkg/hostmgr/p2k/hostcache/hostsummary" "github.com/uber/peloton/pkg/hostmgr/p2k/scalar" hmscalar "github.com/uber/peloton/pkg/hostmgr/scalar" "github.com/pkg/errors" log "github.com/sirupsen/logrus" uatomic "github.com/uber-go/atomic" "github.com/uber-go/tally" "go.uber.org/multierr" "go.uber.org/yarpc/yarpcerrors" ) const ( _hostCacheMetricsRefresh = "hostCacheMetricsRefresh" _hostCacheMetricsRefreshPeriod = 10 * time.Second _hostCachePruneHeldHosts = "hostCachePruneHeldHosts" _hostCachePruneHeldHostsPeriod = 180 * time.Second ) // HostCache manages cluster resources, and provides necessary abstractions to // interact with underlying system. type HostCache interface { // AcquireLeases acquires leases on hosts that match the filter constraints. AcquireLeases(hostFilter *hostmgr.HostFilter) ([]*hostmgr.HostLease, map[string]uint32) // TerminateLease is called when the lease is not going to be used, and we // want to release the lock on the host. TerminateLease(hostname string, leaseID string) error // CompleteLease is called when launching pods on a host that has been // previously leased to the Placement engine. CompleteLease(hostname string, leaseID string, podToSpecMap map[string]*pbpod.PodSpec) error // GetClusterCapacity gets the total capacity and allocation of the cluster. GetClusterCapacity() (capacity, allocation hmscalar.Resources) // Start will start the goroutine that listens for host events. Start() // Stop will stop the host cache go routine that listens for host events. Stop() // GetSummaries returns a list of host summaries that the host cache is // managing. GetSummaries() (summaries []hostsummary.HostSummary) // HandlePodEvent is called by pod events manager on receiving a pod event. HandlePodEvent(event *scalar.PodEvent) // ResetExpiredHeldHostSummaries resets the status of each hostSummary if // the helds have expired and returns the hostnames which got reset. ResetExpiredHeldHostSummaries(now time.Time) []string // GetHostHeldForPod returns the host that is held for the pod. GetHostHeldForPod(podID *peloton.PodID) string // HoldForPods holds the host for the pods specified. HoldForPods(hostname string, podIDs []*peloton.PodID) error // ReleaseHoldForPods release the hold of host for the pods specified. ReleaseHoldForPods(hostname string, podIDs []*peloton.PodID) error // CompleteLaunchPod is called when a pod is successfully launched. // This is for things like removing pods allocated to the pod // from available ports. This is called after successful launch // of individual pod. We cannot do this in CompleteLease. // For example, ports should not be removed after a failed launch, // otherwise the ports are leaked. CompleteLaunchPod(hostname string, pod *models.LaunchablePod) error // RecoverPodInfo updates pods info running on a particular host, // it is used only when hostsummary needs to recover the info // upon restart RecoverPodInfoOnHost( id *peloton.PodID, hostname string, state pbpod.PodState, spec *pbpod.PodSpec, ) // AddPodsToHost is a temporary method to add host entries in host cache. // It would be removed after CompleteLease is called when launching pod. AddPodsToHost(tasks []*hostsvc.LaunchableTask, hostname string) } // hostCache is an implementation of HostCache interface. type hostCache struct { mu sync.RWMutex // Map of hostname to HostSummary. hostIndex map[string]hostsummary.HostSummary // Map of podID to host held. podHeldIndex map[string]string // The event channel on which the underlying cluster manager plugin will send // host events to host cache. hostEventCh chan *scalar.HostEvent // Lifecycle manager. lifecycle lifecycle.LifeCycle // background manager. backgroundMgr background.Manager // Metrics. metrics *Metrics } // New returns a new instance of host cache. func New( hostEventCh chan *scalar.HostEvent, backgroundMgr background.Manager, parent tally.Scope, ) HostCache { return &hostCache{ hostIndex: make(map[string]hostsummary.HostSummary), podHeldIndex: make(map[string]string), hostEventCh: hostEventCh, lifecycle: lifecycle.NewLifeCycle(), metrics: NewMetrics(parent), backgroundMgr: backgroundMgr, } } func (c *hostCache) GetSummaries() []hostsummary.HostSummary { c.mu.RLock() defer c.mu.RUnlock() summaries := make([]hostsummary.HostSummary, 0, len(c.hostIndex)) for _, summary := range c.hostIndex { summaries = append(summaries, summary) } return summaries } // AcquireLeases acquires leases on hosts that match the filter constraints. // The lease will be held until Jobmgr actively launches pods using the leaseID. // Returns: // []*hostmgr.HostLease: List of leases acquired on matching hosts. // map[string]uint32: map filtering result string (i.e. HOST_FILTER_INVALID) to // number of hosts per result for debugging purpose. func (c *hostCache) AcquireLeases( hostFilter *hostmgr.HostFilter, ) ([]*hostmgr.HostLease, map[string]uint32) { c.mu.RLock() defer c.mu.RUnlock() matcher := hostsummary.NewMatcher(hostFilter) // If host hint is provided, try to return the hosts in hints first. for _, filterHints := range hostFilter.GetHint().GetHostHint() { if hs, ok := c.hostIndex[filterHints.GetHostname()]; ok { matcher.TryMatch(hs.GetHostname(), hs) if matcher.HostLimitReached() { break } } } // TODO: implement defrag/firstfit ranker, for now default to first fit for hostname, hs := range c.hostIndex { matcher.TryMatch(hostname, hs) if matcher.HostLimitReached() { break } } var hostLeases []*hostmgr.HostLease hostLimitReached := matcher.HostLimitReached() for _, hostname := range matcher.GetHostNames() { hs := c.hostIndex[hostname] hostLeases = append(hostLeases, hs.GetHostLease()) } if !hostLimitReached { // Still proceed to return something. log.WithFields(log.Fields{ "host_filter": hostFilter, "matched_host_leases": hostLeases, "match_result_counts": matcher.GetFilterCounts(), }).Debug("Number of hosts matched is fewer than max hosts") } return hostLeases, matcher.GetFilterCounts() } // TerminateLease is called when a lease that was previously acquired, and a // host locked, is no longer in use. The leaseID of the acquired host should be // supplied in this call so that the hostcache can match the leaseID. // At this point, the existing lease is terminated and the host can be used for // further placement. // Error cases: // LeaseID doesn't match // Host is not in Placing status func (c *hostCache) TerminateLease( hostname string, leaseID string, ) error { c.mu.RLock() defer c.mu.RUnlock() hs, err := c.getSummary(hostname) if err != nil { return err } if err := hs.TerminateLease(leaseID); err != nil { // TODO: metrics return err } return nil } // CompleteLease is called when launching pods on a host that has been // previously leased to the Placement engine. The leaseID of the acquired host // should be supplied in this call so that the hostcache can match the leaseID, // verify that sufficient resources are present on the host to launch all the // pods in podToSpecMap, and then allow the pods to be launched on this host. // At this point, the existing lease is Completed and the host can be used for // further placement. // Error cases: // LeaseID doesn't match // Host is not in Placing status // There are insufficient resources on the requested host func (c *hostCache) CompleteLease( hostname string, leaseID string, podToSpecMap map[string]*pbpod.PodSpec, ) error { c.mu.RLock() defer c.mu.RUnlock() hs, err := c.getSummary(hostname) if err != nil { return err } if err := hs.CompleteLease(leaseID, podToSpecMap); err != nil { // TODO: metrics return err } // TODO: remove held hosts. return nil } // GetClusterCapacity gets the total cluster capacity and allocation func (c *hostCache) GetClusterCapacity() ( capacity, allocation hmscalar.Resources, ) { c.mu.RLock() defer c.mu.RUnlock() // Go through the hostIndex and calculate capacity and allocation // and sum it up to get these at a cluster level // TODO: do this for slack resources too. for _, hs := range c.hostIndex { capacity = capacity.Add(hs.GetCapacity().NonSlack) allocation = allocation.Add(hs.GetAllocated().NonSlack) } return } // ResetExpiredHeldHostSummaries resets the status of each hostSummary if // the holds have expired and returns the hostnames which got reset. func (c *hostCache) ResetExpiredHeldHostSummaries(deadline time.Time) []string { c.mu.Lock() defer c.mu.Unlock() var pruned []string for hostname, hs := range c.hostIndex { isFreed, _, podIDExpired := hs.DeleteExpiredHolds(deadline) if isFreed { pruned = append(pruned, hostname) } for _, id := range podIDExpired { c.removePodHold(id) } } log.WithField("hosts", pruned).Debug("Hosts pruned") return pruned } func (c *hostCache) GetHostHeldForPod(podID *peloton.PodID) string { c.mu.RLock() defer c.mu.RUnlock() hn, ok := c.podHeldIndex[podID.GetValue()] if !ok { // TODO: this should return an error. But keep it the same way as in // offerpool for now. return "" } return hn } func (c *hostCache) HoldForPods(hostname string, podIDs []*peloton.PodID) error { c.mu.Lock() defer c.mu.Unlock() hs, err := c.getSummary(hostname) if err != nil { return err } var errs []error for _, id := range podIDs { if err := hs.HoldForPod(id); err != nil { errs = append(errs, err) continue } c.addPodHold(hostname, id) } if len(errs) > 0 { return yarpcerrors.InternalErrorf("failed to hold pods: %s", multierr.Combine(errs...)) } return nil } func (c *hostCache) ReleaseHoldForPods(hostname string, podIDs []*peloton.PodID) error { c.mu.Lock() defer c.mu.Unlock() hs, err := c.getSummary(hostname) if err != nil { return err } for _, id := range podIDs { hs.ReleaseHoldForPod(id) c.removePodHold(id) } return nil } // RefreshMetrics refreshes the metrics for hosts in ready and placing state. func (c *hostCache) RefreshMetrics() { totalAvailable := hmscalar.Resources{} totalAllocated := hmscalar.Resources{} readyHosts := float64(0) placingHosts := float64(0) heldHosts := float64(0) hosts := c.GetSummaries() for _, h := range hosts { allocated, capacity := h.GetAllocated(), h.GetCapacity() available, _ := capacity.NonSlack.TrySubtract(allocated.NonSlack) totalAllocated = totalAllocated.Add(allocated.NonSlack) totalAvailable = totalAvailable.Add(available) switch h.GetHostStatus() { case hostsummary.ReadyHost: readyHosts++ case hostsummary.PlacingHost: placingHosts++ } if len(h.GetHeldPods()) > 0 { heldHosts++ } } c.metrics.Available.Update(totalAvailable) c.metrics.Allocated.Update(totalAllocated) c.metrics.ReadyHosts.Update(readyHosts) c.metrics.PlacingHosts.Update(placingHosts) c.metrics.HeldHosts.Update(heldHosts) c.metrics.AvailableHosts.Update(float64(len(hosts))) } // addPodHold add a pod to podHeldIndex. Replace the old host if exists. func (c *hostCache) addPodHold(hostname string, id *peloton.PodID) { old, ok := c.podHeldIndex[id.GetValue()] if ok && old != hostname { log.WithFields(log.Fields{ "new_host": hostname, "old_host": old, "task_id": id.GetValue(), }).Warn("pod is held by multiple hosts") } c.podHeldIndex[id.GetValue()] = hostname } // removePodHold deletes id from podHeldIndex regardless of hostname. func (c *hostCache) removePodHold(id *peloton.PodID) { delete(c.podHeldIndex, id.GetValue()) } func (c *hostCache) CompleteLaunchPod(hostname string, pod *models.LaunchablePod) error { c.mu.RLock() defer c.mu.RUnlock() hs, err := c.getSummary(hostname) if err != nil { return errors.Wrapf(err, "cannot find host %q", hostname) } hs.CompleteLaunchPod(pod) return nil } // getSummary returns host summary given name. If the host does not exist, // return error not found. func (c *hostCache) getSummary(hostname string) (hostsummary.HostSummary, error) { hs, ok := c.hostIndex[hostname] if !ok { // TODO: metrics return nil, yarpcerrors.NotFoundErrorf("cannot find host %s in cache", hostname) } return hs, nil } // waitForHostEvents will start a goroutine that waits on the host events // channel. The underlying plugin will send events to this channel when any // underlying host status changes. Example: allocated resources change, // host goes down or is put in maintenance mode. func (c *hostCache) waitForHostEvents() { for { select { case event := <-c.hostEventCh: switch event.GetEventType() { case scalar.AddHost: c.addHost(event) case scalar.UpdateHostSpec: c.updateHostSpec(event) case scalar.DeleteHost: c.deleteHost(event) case scalar.UpdateHostAvailableRes: c.updateHostAvailable(event) case scalar.UpdateAgent: c.updateAgent(event) } case <-c.lifecycle.StopCh(): return } } } // Handle pod event which is sent by the pod events manager. This is relevant // only in case of K8S where we do resource accounting based on pod events. // Example: a pod delete event will lead to giving back pod's resources to the // host. func (c *hostCache) HandlePodEvent(event *scalar.PodEvent) { // TODO: evaluate locking strategy c.mu.Lock() defer c.mu.Unlock() hostname := event.Event.GetHostname() summary, found := c.hostIndex[hostname] if !found { // TODO(pourchet): Figure out how to handle this. // This could happen if reconciliation was not done before // we start processing pod events. log.WithFields(log.Fields{ "hostname": hostname, "pod_id": event.Event.GetPodId().GetValue(), }).Error("delete pod event ignored: host summary not found") return } summary.HandlePodEvent(event) } func (c *hostCache) addHost(event *scalar.HostEvent) { // TODO: evaluate locking strategy c.mu.Lock() defer c.mu.Unlock() hostInfo := event.GetHostInfo() version := hostInfo.GetResourceVersion() capacity := hostInfo.GetCapacity() // Check if the host already exists in the cache and reject if the event is // of older version. if existing, ok := c.hostIndex[hostInfo.GetHostName()]; ok { evtVersion := hostInfo.GetResourceVersion() // Check if event has older resource version, ignore if it does currentVersion := existing.GetVersion() if scalar.IsOldVersion(currentVersion, evtVersion) { log.WithFields(log.Fields{ "hostname": hostInfo.GetHostName(), "capacity": capacity, "event_version": evtVersion, "current_version": currentVersion, }).Debug("ignore add event") return } } // TODO: figure out how to differemtiate mesos/k8s hosts, // now addHost is only used by k8s hosts c.hostIndex[hostInfo.GetHostName()] = hostsummary.NewKubeletHostSummary( hostInfo.GetHostName(), capacity, version, ) log.WithFields(log.Fields{ "hostname": hostInfo.GetHostName(), "capacity": hostInfo.GetCapacity(), "version": version, }).Debug("add host to cache") } func (c *hostCache) updateHostSpec(event *scalar.HostEvent) { c.mu.Lock() defer c.mu.Unlock() var hs hostsummary.HostSummary var ok bool hostInfo := event.GetHostInfo() evtVersion := hostInfo.GetResourceVersion() capacity := hostInfo.GetCapacity() if hs, ok = c.hostIndex[hostInfo.GetHostName()]; !ok { // Host not found, possibly an out of order even during host // maintenance, due to host being removed from host manager before API // server. // If for some reason a host was indeed missing, it will be added via // reconcile logic. log.WithFields(log.Fields{ "hostname": hostInfo.GetHostName(), "capacity": capacity, "event_version": evtVersion, }).Debug("ignore update event, host not found in cache") return } // Check if event has older resource version, ignore if it does. currentVersion := hs.GetVersion() if scalar.IsOldVersion(currentVersion, evtVersion) { log.WithFields(log.Fields{ "hostname": hostInfo.GetHostName(), "capacity": capacity, "event_version": evtVersion, "current_version": currentVersion, }).Debug("ignore update event") return } hs.SetCapacity(capacity) hs.SetVersion(evtVersion) log.WithFields(log.Fields{ "hostname": hostInfo.GetHostName(), "capacity": hostInfo.GetCapacity(), "version": evtVersion, }).Debug("update host in cache") } func (c *hostCache) deleteHost(event *scalar.HostEvent) { c.mu.Lock() defer c.mu.Unlock() hostInfo := event.GetHostInfo() version := hostInfo.GetResourceVersion() // Check if the host already exists in the cache and reject if the event is // of older version. if existing, ok := c.hostIndex[hostInfo.GetHostName()]; ok { evtVersion := hostInfo.GetResourceVersion() // Check if event has older resource version, ignore if it does. currentVersion := existing.GetVersion() if scalar.IsOldVersion(currentVersion, evtVersion) { log.WithFields(log.Fields{ "hostname": hostInfo.GetHostName(), "event_version": evtVersion, "current_version": currentVersion, }).Debug("ignore delete event") return } } delete(c.hostIndex, hostInfo.GetHostName()) log.WithFields(log.Fields{ "hostname": hostInfo.GetHostName(), "capacity": hostInfo.GetCapacity(), "version": version, }).Debug("delete host from cache") } // only applicable to mesos func (c *hostCache) updateAgent(event *scalar.HostEvent) { c.mu.Lock() defer c.mu.Unlock() var hs hostsummary.HostSummary var ok bool hostInfo := event.GetHostInfo() evtVersion := hostInfo.GetResourceVersion() hs, ok = c.hostIndex[hostInfo.GetHostName()] if !ok { hs = hostsummary.NewMesosHostSummary(hostInfo.GetHostName()) c.hostIndex[hostInfo.GetHostName()] = hs } hs.SetCapacity(hostInfo.GetCapacity()) hs.SetVersion(evtVersion) log.WithFields(log.Fields{ "hostname": hostInfo.GetHostName(), "available": hostInfo.GetAvailable(), "version": evtVersion, }).Debug("update agent info in host cache") } // only applicable to mesos func (c *hostCache) updateHostAvailable(event *scalar.HostEvent) { c.mu.Lock() defer c.mu.Unlock() var hs hostsummary.HostSummary var ok bool hostInfo := event.GetHostInfo() evtVersion := hostInfo.GetResourceVersion() hs, ok = c.hostIndex[hostInfo.GetHostName()] if !ok { hs = hostsummary.NewMesosHostSummary(hostInfo.GetHostName()) c.hostIndex[hostInfo.GetHostName()] = hs } hs.SetAvailable(hostInfo.GetAvailable()) hs.SetVersion(evtVersion) log.WithFields(log.Fields{ "hostname": hostInfo.GetHostName(), "available": hostInfo.GetAvailable(), "version": evtVersion, }).Debug("update host in cache") } // Start will start the goroutine that listens for host events. func (c *hostCache) Start() { if !c.lifecycle.Start() { return } c.backgroundMgr.RegisterWorks( background.Work{ Name: _hostCacheMetricsRefresh, Func: func(_ *uatomic.Bool) { c.RefreshMetrics() }, Period: _hostCacheMetricsRefreshPeriod, }, ) c.backgroundMgr.RegisterWorks( background.Work{ Name: _hostCachePruneHeldHosts, Func: func(_ *uatomic.Bool) { c.ResetExpiredHeldHostSummaries(time.Now()) }, Period: _hostCachePruneHeldHostsPeriod, }, ) go c.waitForHostEvents() log.Warn("hostCache started") } // Stop will stop the host cache go routine that listens for host events. func (c *hostCache) Stop() { if !c.lifecycle.Stop() { return } // Wait for drainer to be stopped c.lifecycle.Wait() log.Info("hostCache stopped") } // Reconcile explicitly reconciles host cache. func (c *hostCache) Reconcile() error { // TODO: Implement return nil } // RecoverPodInfo updates pods info running on a particular host, // it is used only when hostsummary needs to recover the info // upon restart func (c *hostCache) RecoverPodInfoOnHost( id *peloton.PodID, hostname string, state pbpod.PodState, spec *pbpod.PodSpec, ) { c.mu.Lock() defer c.mu.Unlock() var hs hostsummary.HostSummary hs, ok := c.hostIndex[hostname] if !ok { if spec.GetMesosSpec() != nil { hs = hostsummary.NewMesosHostSummary(hostname) } else { // TODO: populate capacity and version correctly hs = hostsummary.NewKubeletHostSummary(hostname, models.HostResources{}, "") } c.hostIndex[hostname] = hs } hs.RecoverPodInfo(id, state, spec) } // AddPodsToHost is a temporary method to add host entries in host cache. // It would be removed after CompleteLease is called when launching pod. func (c *hostCache) AddPodsToHost(tasks []*hostsvc.LaunchableTask, hostname string) { for _, lt := range tasks { jobID, instanceID, err := util.ParseJobAndInstanceID(lt.GetTaskId().GetValue()) if err != nil { log.WithFields(log.Fields{ "mesos_id": lt.GetTaskId().GetValue(), }).WithError(err).Error("fail to parse ID when RecoverPodInfoOnHost in LaunchTask") continue } c.RecoverPodInfoOnHost( util.CreatePodIDFromMesosTaskID(lt.GetTaskId()), hostname, pbpod.PodState_POD_STATE_LAUNCHED, api.ConvertTaskConfigToPodSpec(lt.GetConfig(), jobID, instanceID), ) } }