pkg/hostmgr/hostpool/manager/manager.go (415 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package manager import ( "context" "sync" "time" pb_host "github.com/uber/peloton/.gen/peloton/api/v0/host" pb_eventstream "github.com/uber/peloton/.gen/peloton/private/eventstream" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/constraints" "github.com/uber/peloton/pkg/common/eventstream" "github.com/uber/peloton/pkg/common/lifecycle" "github.com/uber/peloton/pkg/hostmgr/host" "github.com/uber/peloton/pkg/hostmgr/hostpool" "github.com/uber/peloton/pkg/storage/objects" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" "go.uber.org/yarpc/yarpcerrors" ) const ( // Time interval between periodic reconciliation. _defaultReconcileInterval = 10 * time.Second // Timeout for calls to DB. _defaultDBTimeout = 10 * time.Second // Error message to log on failures to update host pool in DB. _updateHostPoolErrMsg = "failed to update host pool in host_infos table" ) // HostPoolManager provides abstraction to manage host pools of a cluster. type HostPoolManager interface { // GetPool gets a host pool from cache by given pool id. GetPool(poolID string) (hostpool.HostPool, error) // Pools returns all host pools from cache. Pools() map[string]hostpool.HostPool // GetPoolByHostname returns the host pool given host belongs to. GetPoolByHostname(hostname string) (hostpool.HostPool, error) // RegisterPool creates a host pool with given ID if not exists. RegisterPool(poolID string) // DeregisterPool deletes existing host pool with given ID. DeregisterPool(poolID string) error // ChangeHostPool changes host pool of given host from source pool to // destination pool. ChangeHostPool(hostname, srcPool, destPool string) error // GetDesiredPool gets desired pool of given host in db. GetDesiredPool(hostname string) (string, error) // UpdateDesiredPool updates desired pool of given host in db. UpdateDesiredPool(hostname, poolID string) error // Start starts the host pool cache go routine that reconciles host pools. // It returns error if failed to recover host pool data from db. Start() error // Stop stops the host pool cache go routine that reconciles host pools. Stop() } // hostPoolManager implements HostPoolManager interface. // it ensures: // - host pool cache is consistent with db. // - host pool cache is consistent with host cache in host manager // for recovery from restart etc. // - every host in the cluster belongs to, and only belongs to ONE host pool. // TODO: Use new host cache once it merges with agent map. type hostPoolManager struct { mu sync.RWMutex // reconcileInternal defines how frequently host pool manager reconciles // host pool cache. reconcileInternal time.Duration // event stream handler eventStreamHandler *eventstream.Handler // hostInfoOps is the interface to update host_infos table. hostInfoOps objects.HostInfoOps // poolIndex is map from host pool id to host pool. poolIndex map[string]hostpool.HostPool // hostToPoolMap is map from hostname to id of host pool it belongs to. hostToPoolMap map[string]string // Lifecycle manager. lifecycle lifecycle.LifeCycle // Parent scope of host pool manager metrics. parentScope tally.Scope // Metrics. metrics *Metrics } // New returns a host pool manager instance. func New( reconcileInternal time.Duration, eventStreamHandler *eventstream.Handler, hostInfoOps objects.HostInfoOps, parentScope tally.Scope, ) HostPoolManager { // If provided reconcile interval is less or equal to zero, // use default reconcile interval. if reconcileInternal <= 0 { reconcileInternal = _defaultReconcileInterval } manager := &hostPoolManager{ reconcileInternal: reconcileInternal, eventStreamHandler: eventStreamHandler, hostInfoOps: hostInfoOps, poolIndex: make(map[string]hostpool.HostPool), hostToPoolMap: make(map[string]string), lifecycle: lifecycle.NewLifeCycle(), parentScope: parentScope, metrics: NewMetrics(parentScope), } // Register default host pool when constructing new host pool manager. manager.RegisterPool(common.DefaultHostPoolID) return manager } // GetPool gets a host pool from cache by given pool id. // It returns error if a host pool with given pool id doesn't exist. func (m *hostPoolManager) GetPool(poolID string) (hostpool.HostPool, error) { m.mu.RLock() defer m.mu.RUnlock() pool, ok := m.poolIndex[poolID] if !ok { m.metrics.GetPoolErr.Inc(1) return nil, errors.Errorf("host pool %s not found", poolID) } return pool, nil } // Pools returns all host pools from cache. func (m *hostPoolManager) Pools() map[string]hostpool.HostPool { m.mu.RLock() defer m.mu.RUnlock() pools := make(map[string]hostpool.HostPool) for id, pool := range m.poolIndex { pools[id] = pool } return pools } // GetPoolByHostname returns the host pool given host belongs to. // It returns error if host doesn't exist in hostToPoolMap or // the host pool with looked up host pool ID doesn't exist in poolIndex. func (m *hostPoolManager) GetPoolByHostname(hostname string) (hostpool.HostPool, error) { m.mu.RLock() defer m.mu.RUnlock() poolID, ok := m.hostToPoolMap[hostname] if !ok { m.metrics.GetPoolByHostnameErr.Inc(1) return nil, errors.Errorf("host %s not found", hostname) } pool, ok := m.poolIndex[poolID] if !ok { // This shouldn't happen since host pool manager should ensure // poolIndex is always in-sync with hostToPoolMap. m.metrics.GetPoolByHostnameErr.Inc(1) return nil, errors.Errorf("host pool %s not found", poolID) } return pool, nil } // RegisterPool creates a host pool with given ID if not exists. // If a host pool with given pool id already exists, it is a no-op. // If a host pool with given pool id doesn't exist, it creates an empty host pool. func (m *hostPoolManager) RegisterPool(poolID string) { m.mu.Lock() defer m.mu.Unlock() if _, ok := m.poolIndex[poolID]; !ok { m.poolIndex[poolID] = hostpool.New(poolID, m.parentScope) log.WithField(hostpool.HostPoolKey, poolID). Info("Registered new host pool") } else { log.WithField(hostpool.HostPoolKey, poolID). Warn("Host pool already registered") } } // updatePoolInHostInfo updates current & desired pool on given host // in host_infos table. func (m *hostPoolManager) updatePoolInHostInfo(hostname, poolID string) error { if len(hostname) == 0 { m.metrics.UpdateCurrentPoolErr.Inc(1) return errors.New("hostname is empty") } ctx, cancel := context.WithTimeout(context.Background(), _defaultDBTimeout) defer cancel() err := m.hostInfoOps.UpdatePool( ctx, hostname, poolID, poolID) if err != nil { m.metrics.UpdateCurrentPoolErr.Inc(1) return err } log.WithFields(log.Fields{ hostpool.HostnameKey: hostname, hostpool.HostPoolKey: poolID}). Info("Changed pool for host") return nil } // DeregisterPool deletes existing host pool with given ID. // It returns error if callers tries to delete default host pool. // It returns error if default host pool doesn't exist. // If a host pool with given pool id already exists, it deletes the pool // and moves hosts in the deleted pool to default pool. // If a host pool with given pool id doesn't exist, it is a no-op. func (m *hostPoolManager) DeregisterPool(poolID string) error { if poolID == common.DefaultHostPoolID { m.metrics.DeregisterPoolErr.Inc(1) return yarpcerrors.InvalidArgumentErrorf( "can't delete %s host pool", common.DefaultHostPoolID) } m.mu.Lock() defer m.mu.Unlock() if pool, ok := m.poolIndex[poolID]; !ok { log.WithField(hostpool.HostPoolKey, poolID). Warn("Host pool not found") } else { // move hosts to default pool defaultPool, ok := m.poolIndex[common.DefaultHostPoolID] if !ok { m.metrics.DeregisterPoolErr.Inc(1) return yarpcerrors.InternalErrorf( "%s host pool not found", common.DefaultHostPoolID) } for h := range pool.Hosts() { err := m.updatePoolInHostInfo(h, common.DefaultHostPoolID) if err != nil { return yarpcerrors.InternalErrorf( "failed to update current pool of %s in db: %v", h, err, ) } m.hostToPoolMap[h] = common.DefaultHostPoolID pool.Delete(h) defaultPool.Add(h) m.publishPoolEvent(h, common.DefaultHostPoolID) } delete(m.poolIndex, poolID) log.WithField(hostpool.HostPoolKey, poolID). Info("Deleted existing host pool") if agentMap := host.GetAgentMap(); agentMap == nil { log.Warn("Failed to get agent-map in DeregisterPool") } else { defaultPool.RefreshCapacity(agentMap.HostCapacities) } } return nil } // ChangeHostPool changes host pool of given host from source pool to // destination pool. // If either source pool or destination pool doesn't exist, it returns error. // If host is not in source pool, fails the move attempt for that host. // TODO: Add more implementation after required hostInfo store change is done. func (m *hostPoolManager) ChangeHostPool( hostname, srcPoolID, destPoolID string, ) (err error) { m.mu.Lock() defer func() { if err != nil { m.metrics.ChangeHostPoolErr.Inc(1) } m.mu.Unlock() }() poolID, ok := m.hostToPoolMap[hostname] if !ok { err = yarpcerrors.NotFoundErrorf("host not found") return } if poolID != srcPoolID { err = yarpcerrors.InvalidArgumentErrorf("source pool mismatch") return } srcPool, ok := m.poolIndex[poolID] if !ok { err = yarpcerrors.InternalErrorf("src pool not found") return } if srcPoolID == destPoolID { return } destPool, ok := m.poolIndex[destPoolID] if !ok { err = yarpcerrors.InvalidArgumentErrorf("invalid dest pool") return } if err = m.updatePoolInHostInfo(hostname, destPoolID); err != nil { return } m.hostToPoolMap[hostname] = destPoolID srcPool.Delete(hostname) destPool.Add(hostname) if agentMap := host.GetAgentMap(); agentMap == nil { log.Warn("Failed to get agent-map in ChangeHostPool") } else { srcPool.RefreshCapacity(agentMap.HostCapacities) destPool.RefreshCapacity(agentMap.HostCapacities) } m.publishPoolEvent(hostname, destPoolID) return } // GetDesiredPool gets desired pool of given host in db. func (m *hostPoolManager) GetDesiredPool(hostname string) (string, error) { if len(hostname) == 0 { m.metrics.GetDesirePoolErr.Inc(1) return "", errors.New("hostname is empty") } ctx, cancel := context.WithTimeout(context.Background(), _defaultDBTimeout) defer cancel() hostInfo, err := m.hostInfoOps.Get(ctx, hostname) if err != nil { m.metrics.GetDesirePoolErr.Inc(1) return "", err } return hostInfo.DesiredPool, nil } // UpdateDesiredPool updates desired pool of given host in db. // It returns error if either hostname or poolID is empty. func (m *hostPoolManager) UpdateDesiredPool(hostname, poolID string) error { if len(hostname) == 0 { m.metrics.UpdateDesiredPoolErr.Inc(1) return errors.New("hostname is empty") } if len(poolID) == 0 { m.metrics.UpdateDesiredPoolErr.Inc(1) return errors.New("poolID is empty") } ctx, cancel := context.WithTimeout(context.Background(), _defaultDBTimeout) defer cancel() err := m.hostInfoOps.UpdateDesiredPool(ctx, hostname, poolID) if err != nil { m.metrics.UpdateDesiredPoolErr.Inc(1) return err } return nil } func (m *hostPoolManager) publishPoolEvent(hostname, poolID string) { poolEvent := &pb_eventstream.Event{ Type: pb_eventstream.Event_HOST_EVENT, HostEvent: &pb_host.HostEvent{ Hostname: hostname, Type: pb_host.HostEvent_TYPE_HOST_POOL, HostPoolEvent: &pb_host.HostPoolEvent{ Pool: poolID, }, }, } m.eventStreamHandler.AddEvent(poolEvent) } // Start starts the host pool cache go routine that reconciles host pools. // It runs periodical reconciliation. // It returns error if failed to recover host pool data from db. func (m *hostPoolManager) Start() error { if !m.lifecycle.Start() { return nil } log.Info("Starting host pool manager") // Recover host pool data from db. if err := m.reconcile(); err != nil { return err } // Start goroutine to periodically reconcile host pool cache. go func() { defer m.lifecycle.StopComplete() ticker := time.NewTicker(m.reconcileInternal) defer ticker.Stop() for { select { case <-ticker.C: if err := m.reconcile(); err != nil { log.Error(err) } case <-m.lifecycle.StopCh(): return } } }() return nil } // Stop stops the host pool cache go routine that reconciles host pools. // It stops periodical reconciliation. func (m *hostPoolManager) Stop() { if !m.lifecycle.Stop() { return } log.Info("Stopping host pool manager") // Clean up host pool manager in-memory cache m.mu.Lock() m.poolIndex = map[string]hostpool.HostPool{} m.hostToPoolMap = map[string]string{} // Release lock before Wait() to avoid deadlock with reconcile goroutine m.mu.Unlock() m.lifecycle.Wait() log.Info("Host pool manager stopped") } // Update DB to move host to default pool. func (m *hostPoolManager) moveToDefault(hostname string) error { const msg string = "reconcile: moving new host to default pool" err := m.updatePoolInHostInfo(hostname, common.DefaultHostPoolID) if err != nil { log.WithError(err). WithField(hostpool.HostnameKey, hostname). Error(msg + ": " + _updateHostPoolErrMsg) return err } log.WithField(hostpool.HostnameKey, hostname).Info(msg) m.publishPoolEvent(hostname, common.DefaultHostPoolID) return nil } // reconcile synchronizes hostToPoolMap and poolIndex with the pool information // from DB. // 1. Create default pool if required. // 2. For each host found in the DB: // a. If current_pool is not set, update it to default pool // b. Create a pool object if required // c. Ensure that the pool's host-list contains the host // d. Add the host->pool entry to a new hostToPoolMap // 3. Publish pool-change event if there are differences between // old and new hostToPoolMap instances. // 4. For each pool, remove all hosts that are no longer in that // pool as per the new hostToPoolMap. func (m *hostPoolManager) reconcile() error { sw := m.metrics.ReconcileTime.Start() m.mu.Lock() defer func() { m.mu.Unlock() sw.Stop() }() // Create default host pool if not exists. if _, ok := m.poolIndex[common.DefaultHostPoolID]; !ok { m.poolIndex[common.DefaultHostPoolID] = hostpool.New(common.DefaultHostPoolID, m.parentScope) log.WithField(hostpool.HostPoolKey, common.DefaultHostPoolID). Info("reconcile: created default pool") } ctx, cancel := context.WithTimeout(context.Background(), _defaultDBTimeout) defer cancel() // Fetch all hosts from DB. allHostInfo, err := m.hostInfoOps.GetAll(ctx) if err != nil { m.metrics.ReconcileErr.Inc(1) return errors.Wrap(err, "reconcile: failed to get hosts from DB: ") } // Build a new hostToPoolMap based on current_pool of each host in DB. // If current_pool is not set, set the value to default pool in DB. // Check if a pool with that name exists. Create one if required. // Ensure that the host is present in the host-list of the pool. newHostToPoolMap := make(map[string]string) for _, hi := range allHostInfo { hostname := hi.GetHostname() p := hi.GetCurrentPool() if p == "" { p = common.DefaultHostPoolID if err := m.moveToDefault(hostname); err != nil { continue } } newHostToPoolMap[hi.GetHostname()] = p oldpool, ok := m.poolIndex[p] if !ok { oldpool = hostpool.New(p, m.parentScope) m.poolIndex[p] = oldpool log.WithField(hostpool.HostPoolKey, p). Info("reconcile: registered new host pool") } // ensure host is in the list of hosts for the pool oldpool.Add(hostname) } // Compare old & new pools for each host, and publish an event if they // are different for h, oldPool := range m.hostToPoolMap { newPool, ok := newHostToPoolMap[h] if !ok || oldPool != newPool { m.publishPoolEvent(h, newPool) } } m.hostToPoolMap = newHostToPoolMap // Remove hosts from each pool which are not associated with the pool // according to new hostToPoolMap. for _, oldPool := range m.poolIndex { for hostname := range oldPool.Hosts() { newPool, ok := m.hostToPoolMap[hostname] if !ok || newPool != oldPool.ID() { oldPool.Delete(hostname) } } } m.refreshPoolCapacity() m.refreshMetrics() return nil } // refreshMetrics refreshes metrics of host pool cache and each host pool in the cache. func (m *hostPoolManager) refreshMetrics() { // Refresh metrics of each host pool in the host pool cache. for _, pool := range m.poolIndex { pool.RefreshMetrics() } // Refresh host pool cache metrics. m.metrics.TotalHosts.Update(float64(len(m.hostToPoolMap))) m.metrics.TotalPools.Update(float64(len(m.poolIndex))) } // refreshPoolCapacity recalculates the capacity of each host-pool. func (m *hostPoolManager) refreshPoolCapacity() { agentMap := host.GetAgentMap() if agentMap == nil { log.Warn("Failed to load agent-map in refreshPoolCapacity") return } hostCapacities := agentMap.HostCapacities for _, pool := range m.poolIndex { pool.RefreshCapacity(hostCapacities) } } // GetHostPoolLabelValues creates a LabelValues for host pool of a host. func GetHostPoolLabelValues( manager HostPoolManager, hostname string, ) (constraints.LabelValues, error) { lv := make(constraints.LabelValues) pool, err := manager.GetPoolByHostname(hostname) if err != nil { return lv, errors.Wrapf( err, "error when getting host pool of host %s", hostname, ) } lv[common.HostPoolKey] = map[string]uint32{pool.ID(): 1} return lv, nil }