pkg/hostmgr/p2k/hostcache/hostsummary/basehostsummary.go (398 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package hostsummary
import (
"fmt"
"sync"
"time"
pbhost "github.com/uber/peloton/.gen/peloton/api/v1alpha/host"
"github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton"
pbpod "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod"
hostmgr "github.com/uber/peloton/.gen/peloton/private/hostmgr/v1alpha"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/common/v1alpha/constraints"
"github.com/uber/peloton/pkg/hostmgr/models"
p2kscalar "github.com/uber/peloton/pkg/hostmgr/p2k/scalar"
"github.com/uber/peloton/pkg/hostmgr/scalar"
"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc/yarpcerrors"
)
// makes sure baseHostSummary implements HostSummary
var _ HostSummary = &baseHostSummary{}
// HostStatus represents status (Ready/Placing/Reserved/Held) of the host in
// host cache
type HostStatus int
const (
// ReadyHost represents a host ready to be used.
ReadyHost HostStatus = iota + 1
// PlacingHost represents a host being used by placement engine.
PlacingHost
// ReservedHost represents a host that is reserved for tasks.
ReservedHost
)
const (
// hostHeldHostStatusTimeout is a timeout for resetting.
// HeldHost status back to ReadyHost status.
// TODO: Make this configurable (T3312219).
hostHeldStatusTimeout = 3 * time.Minute
// emptyLeaseID is used when the host is in READY state.
emptyLeaseID = ""
)
// baseHostSummary is a data struct holding resources and metadata of a host.
type baseHostSummary struct {
mu sync.RWMutex
// Hostname of the host.
hostname string
// Labels on this host.
labels []*peloton.Label
// List of port ranges available for allocation.
ports []*pbhost.PortRange
// locking status of this host.
status HostStatus
// LeaseID is a valid UUID when the host is locked for placement and will
// be used to ensure that the the host is used to launch only those pods
// for which the lease was acquired by placement engine. Will be empty if
// host is not in placing state. This leaseID does not correspond to a
// chunk of resources on that host, but the entire host. So we run the risk
// of locking the entire host even if the resource constraint is small. We
// can optimize this further by maintaining a list of leaseIDs per host.
leaseID string
// Resource version of this host.
version string
// Strategy pattern adopted by the particular host.
strategy hostStrategy
// capacity of the host
capacity models.HostResources
// Resources allocated on the host. This should always be equal to the sum
// of resources in pods.
allocated models.HostResources
// available resources on the host
available models.HostResources
// A map to present tasks assigned or running on this host.
// Key is the tasks id, value is the pod spec and current status.
pods *podInfoMap
// A map of podIDs for which the host is held.
// Key is the podID, value is the expiration time of the hold.
heldPodIDs map[string]time.Time
}
// newBaseHostSummary returns a zero initialized HostSummary object.
func newBaseHostSummary(
hostname string,
version string,
) *baseHostSummary {
return &baseHostSummary{
status: ReadyHost,
hostname: hostname,
heldPodIDs: make(map[string]time.Time),
version: version,
strategy: &noopHostStrategy{},
pods: newPodInfoMap(),
// TODO: make the initial port range configs.
ports: []*pbhost.PortRange{{Begin: 31000, End: 32000}},
}
}
// TryMatch atomically tries to match the current host with given HostFilter,
// and lock the host if it does. If current baseHostSummary is matched, this host
// will be marked as `PLACING`, after which it cannot be used by another
// placement engine until released. If current host is not matched by given
// HostFilter, the host status will remain unchanged.
func (a *baseHostSummary) TryMatch(
filter *hostmgr.HostFilter,
) Match {
a.mu.Lock()
defer a.mu.Unlock()
if a.status != ReadyHost {
return Match{
Result: hostmgr.HostFilterResult_HOST_FILTER_MISMATCH_STATUS,
}
}
// For a host held pods, we anticipate in place upgrades to happen. So, it
// is only a match when the hint contains the host and we temporarily
// reject any additional pod placements on the host.
if a.isHeld() {
var hintFound bool
for _, hostHint := range filter.GetHint().GetHostHint() {
if hostHint.GetHostname() == a.hostname {
hintFound = true
break
}
}
if !hintFound {
return Match{
Result: hostmgr.HostFilterResult_HOST_FILTER_MISMATCH_STATUS,
}
}
}
result := a.matchHostFilter(filter)
if result != hostmgr.HostFilterResult_HOST_FILTER_MATCH {
return Match{Result: result}
}
// TODO: Handle oversubscription
// Setting status to `PlacingHost`: this ensures proper state tracking of
// resources on the host and also ensures that this host will not be used by
// another placement engine before it is released.
err := a.casStatus(a.status, PlacingHost)
if err != nil {
return Match{
Result: hostmgr.HostFilterResult_HOST_FILTER_MISMATCH_STATUS,
}
}
return Match{
Result: hostmgr.HostFilterResult_HOST_FILTER_MATCH,
HostName: a.hostname,
}
}
// CompleteLease verifies that the leaseID on this host is still valid.
// It checks that current baseHostSummary is in Placing status, updates pods
// to the host summary, recalculates allocated resources and set the host status
// to Ready.
func (a *baseHostSummary) CompleteLease(
leaseID string,
podToSpecMap map[string]*pbpod.PodSpec,
) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.status != PlacingHost {
return yarpcerrors.InvalidArgumentErrorf("host status is not Placing")
}
if a.leaseID != leaseID {
return yarpcerrors.InvalidArgumentErrorf("host leaseID does not match")
}
if err := a.casStatus(PlacingHost, ReadyHost); err != nil {
return yarpcerrors.InvalidArgumentErrorf("failed to unlock host: %s", err)
}
if err := a.validatePodsNotExist(podToSpecMap); err != nil {
return err
}
// Add to pod map, so postCompleteLease can work on the up-to-date data.
// revert the change if postCompleteLease fails.
a.pods.AddPodSpecs(podToSpecMap)
if err := a.strategy.postCompleteLease(podToSpecMap); err != nil {
for id := range podToSpecMap {
a.pods.RemovePod(id)
}
return err
}
log.WithFields(log.Fields{
"hostname": a.hostname,
"pods": podToSpecMap,
}).Debug("pods added to the host for launch")
return nil
}
// validatePodsNotExist will return an error if
// the pod already exists on the host map.
func (a *baseHostSummary) validatePodsNotExist(
podToSpecMap map[string]*pbpod.PodSpec,
) error {
if podID := a.pods.AnyPodExist(podToSpecMap); podID != "" {
return yarpcerrors.InvalidArgumentErrorf("pod %v already exists on the host", podID)
}
return nil
}
// CasStatus sets the status to new value if current value is old, otherwise
// returns error.
func (a *baseHostSummary) CasStatus(old, new HostStatus) error {
a.mu.Lock()
defer a.mu.Unlock()
if err := a.casStatus(old, new); err != nil {
return yarpcerrors.InvalidArgumentErrorf("failed to set cas status: %s", err)
}
return nil
}
// GetVersion returns the version of the host.
func (a *baseHostSummary) GetVersion() string {
a.mu.RLock()
defer a.mu.RUnlock()
return a.version
}
// SetVersion sets the version of the host.
func (a *baseHostSummary) SetVersion(v string) {
a.mu.Lock()
defer a.mu.Unlock()
a.version = v
}
// GetHostname returns the hostname of the host.
func (a *baseHostSummary) GetHostname() string {
a.mu.RLock()
defer a.mu.RUnlock()
return a.hostname
}
// GetHostStatus returns the HostStatus of the host.
func (a *baseHostSummary) GetHostStatus() HostStatus {
a.mu.RLock()
defer a.mu.RUnlock()
return a.status
}
// GetHostLease creates and returns a host lease.
func (a *baseHostSummary) GetHostLease() *hostmgr.HostLease {
a.mu.RLock()
defer a.mu.RUnlock()
return &hostmgr.HostLease{
LeaseId: &hostmgr.LeaseID{
Value: a.leaseID,
},
HostSummary: &pbhost.HostSummary{
Hostname: a.hostname,
// TODO: replace this with models.HostResources.
Resources: scalar.ToPelotonResources(a.available.NonSlack),
Labels: a.labels,
AvailablePorts: a.ports,
},
}
}
// TerminateLease is called when terminating the lease on a host.
// This will be called when host in PLACING state is not used, and placement
// engine decides to terminate its lease and set the host back to Ready.
func (a *baseHostSummary) TerminateLease(leaseID string) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.status != PlacingHost {
return yarpcerrors.InvalidArgumentErrorf("invalid status %v", a.status)
}
// TODO: lease may be expired already.
if a.leaseID != leaseID {
return yarpcerrors.InvalidArgumentErrorf("host leaseID does not match")
}
if err := a.casStatus(PlacingHost, ReadyHost); err != nil {
return yarpcerrors.InvalidArgumentErrorf("failed to set cas status: %s", err)
}
return nil
}
// GetCapacity returns the capacity of the host.
func (a *baseHostSummary) GetCapacity() models.HostResources {
a.mu.RLock()
defer a.mu.RUnlock()
return a.capacity
}
// GetAllocated returns the allocation of the host.
func (a *baseHostSummary) GetAllocated() models.HostResources {
a.mu.RLock()
defer a.mu.RUnlock()
return a.allocated
}
// HoldForPod adds pod to heldPodIDs map when host is not reserved. It is noop
// if pod already exists in the map.
func (a *baseHostSummary) HoldForPod(id *peloton.PodID) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.status == ReservedHost {
return yarpcerrors.InvalidArgumentErrorf("invalid status %v for holding", a.status)
}
if _, ok := a.heldPodIDs[id.GetValue()]; !ok {
a.heldPodIDs[id.GetValue()] = time.Now().Add(hostHeldStatusTimeout)
}
log.WithFields(log.Fields{
"hostname": a.hostname,
"pods_held": a.heldPodIDs,
"pod_id": id.GetValue(),
}).Debug("Hold for pod")
return nil
}
// ReleaseHoldForPod removes the pod from heldPodIDs map. It should be called
// when:
// 1. pod is upgraded in place.
// 2. hold for this pod expires.
func (a *baseHostSummary) ReleaseHoldForPod(id *peloton.PodID) {
a.mu.Lock()
defer a.mu.Unlock()
a.releaseHoldForPod(id)
return
}
// GetHeldPods returns a list of held PodIDs.
func (a *baseHostSummary) GetHeldPods() []*peloton.PodID {
a.mu.Lock()
defer a.mu.Unlock()
var result []*peloton.PodID
for id := range a.heldPodIDs {
result = append(result, &peloton.PodID{Value: id})
}
return result
}
// DeleteExpiredHolds deletes expired held pods in a hostSummary, returns
// whether the hostSummary is free of helds,
// available resource,
// and the pods held expired.
func (a *baseHostSummary) DeleteExpiredHolds(
deadline time.Time) (bool, models.HostResources, []*peloton.PodID) {
a.mu.Lock()
defer a.mu.Unlock()
var expired []*peloton.PodID
for id, expirationTime := range a.heldPodIDs {
if deadline.After(expirationTime) {
pod := &peloton.PodID{Value: id}
a.releaseHoldForPod(pod)
expired = append(expired, pod)
}
}
return !a.isHeld(), a.available, expired
}
func (a *baseHostSummary) releaseHoldForPod(id *peloton.PodID) {
if _, ok := a.heldPodIDs[id.GetValue()]; !ok {
// This can happen for various reasons such as a task is launched again
// on the same host after timeout.
log.WithFields(log.Fields{
"hostname": a.hostname,
"pod_id": id.GetValue(),
}).Info("Host not held for pod")
return
}
delete(a.heldPodIDs, id.GetValue())
log.WithFields(log.Fields{
"hostname": a.hostname,
"pod_id": id.GetValue(),
}).Debug("Release hold for pod")
}
// isHeld is true when number of held PodIDs is greater than zero.
func (a *baseHostSummary) isHeld() bool {
return len(a.heldPodIDs) > 0
}
// GetAvailable returns the available resources of the host.
func (a *baseHostSummary) GetAvailable() models.HostResources {
a.mu.RLock()
defer a.mu.RUnlock()
return a.available
}
// HandlePodEvent update host to pod map in baseHostSummary,
// corresponding subclasses could overwrite the method, but need to
// call the superclass method manually
func (a *baseHostSummary) HandlePodEvent(event *p2kscalar.PodEvent) {
a.mu.Lock()
defer a.mu.Unlock()
a.handlePodEvent(event)
}
func (a *baseHostSummary) handlePodEvent(event *p2kscalar.PodEvent) {
podID := event.Event.GetPodId().GetValue()
switch event.EventType {
case p2kscalar.AddPod, p2kscalar.UpdatePod:
// We do not need to update a.pods.spec during an Add event, as it will
// always follow a Launch, which would populate the field.
// Update events only change the image of the pod, and as such the
// resource accounting doesn't change.
podState := pbpod.PodState(pbpod.PodState_value[event.Event.GetActualState()])
if util.IsPelotonPodStateTerminal(podState) {
a.pods.RemovePod(podID)
} else {
podInfo, ok := a.pods.GetPodInfo(podID)
if !ok {
return
}
podInfo.state = podState
}
return
case p2kscalar.DeletePod:
// The release error scenario is handled inside release. If the pod
// was already deleted, ReleasePodResources no-ops, which is correct
// here.
a.pods.RemovePod(podID)
return
default:
log.WithField("pod_event", event).
Error("unsupported pod event type")
}
return
}
// SetCapacity sets the capacity of the host.
func (a *baseHostSummary) SetCapacity(r models.HostResources) {
a.mu.Lock()
defer a.mu.Unlock()
a.capacity = r
}
// SetAvailable sets the available resources of the host.
func (a *baseHostSummary) SetAvailable(r models.HostResources) {
a.mu.Lock()
defer a.mu.Unlock()
a.available = r
}
// casStatus lock-freely sets the status to new value and update lease ID if
// current value is old, otherwise returns error.
// This function assumes baseHostSummary lock is held before calling.
func (a *baseHostSummary) casStatus(oldStatus, newStatus HostStatus) error {
if a.status != oldStatus {
return fmt.Errorf("invalid old status: %v", oldStatus)
}
a.status = newStatus
switch a.status {
case ReadyHost:
// if its a ready host then reset the hostOfferID
a.leaseID = emptyLeaseID
case PlacingHost:
// generate the offer id for a placing host.
a.leaseID = uuid.New()
case ReservedHost:
// generate the offer id for a placing host.
a.leaseID = uuid.New()
}
return nil
}
// matchHostFilter determines whether given HostFilter matches the host.
// This function assumes baseHostSummary lock is held before calling.
func (a *baseHostSummary) matchHostFilter(
c *hostmgr.HostFilter,
) hostmgr.HostFilterResult {
min := c.GetResourceConstraint().GetMinimum()
if min != nil {
// Get min required resources.
minRes := scalar.FromResourceSpec(min)
if !a.available.NonSlack.Contains(minRes) {
return hostmgr.HostFilterResult_HOST_FILTER_INSUFFICIENT_RESOURCES
}
}
// TODO: Match ports resources.
sc := c.GetSchedulingConstraint()
// If constraints don't specify an exclusive host, then reject
// hosts that are designated as exclusive.
if constraints.IsNonExclusiveConstraint(sc) &&
constraints.HasExclusiveLabel(a.labels) {
log.WithField("hostname", a.hostname).Debug("Skipped exclusive host")
return hostmgr.HostFilterResult_HOST_FILTER_MISMATCH_CONSTRAINTS
}
if sc == nil {
// No scheduling constraint, we have a match.
return hostmgr.HostFilterResult_HOST_FILTER_MATCH
}
// Only evaluator based on host constraints is in use.
evaluator := constraints.NewEvaluator(
pbpod.LabelConstraint_LABEL_CONSTRAINT_KIND_HOST)
lv := constraints.GetHostLabelValues(a.hostname, a.labels)
result, err := evaluator.Evaluate(sc, lv)
if err != nil {
log.WithError(err).
Error("Evaluating input constraint")
return hostmgr.HostFilterResult_HOST_FILTER_MISMATCH_CONSTRAINTS
}
switch result {
case constraints.EvaluateResultMatch:
fallthrough
case constraints.EvaluateResultNotApplicable:
log.WithFields(log.Fields{
"labels": lv,
"hostname": a.hostname,
"constraint": sc,
}).Debug("Attributes match constraint")
default:
log.WithFields(log.Fields{
"labels": lv,
"hostname": a.hostname,
"constraint": sc,
}).Debug("Attributes do not match constraint")
return hostmgr.HostFilterResult_HOST_FILTER_MISMATCH_CONSTRAINTS
}
return hostmgr.HostFilterResult_HOST_FILTER_MATCH
}
func (a *baseHostSummary) CompleteLaunchPod(pod *models.LaunchablePod) {
}
// RecoverPodInfo updates pods info on the host, it is used only
// when hostsummary needs to recover the info upon restart
func (a *baseHostSummary) RecoverPodInfo(
id *peloton.PodID,
state pbpod.PodState,
spec *pbpod.PodSpec) {
a.mu.Lock()
defer a.mu.Unlock()
if util.IsPelotonPodStateTerminal(state) {
a.pods.RemovePod(id.GetValue())
return
}
info, ok := a.pods.GetPodInfo(id.GetValue())
if !ok {
a.pods.AddPodInfo(id.GetValue(), &podInfo{
spec: spec,
state: state,
})
return
}
info.state = state
info.spec = spec
}
type noopHostStrategy struct{}
func (s *noopHostStrategy) postCompleteLease(podToSpecMap map[string]*pbpod.PodSpec) error {
return nil
}