pkg/hostmgr/p2k/hostmgrsvc/handler.go (324 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package hostmgrsvc
import (
"context"
"fmt"
"github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton"
pbpod "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod"
hostmgr "github.com/uber/peloton/.gen/peloton/private/hostmgr/v1alpha"
v1alpha "github.com/uber/peloton/.gen/peloton/private/hostmgr/v1alpha"
"github.com/uber/peloton/.gen/peloton/private/hostmgr/v1alpha/svc"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/hostmgr/models"
"github.com/uber/peloton/pkg/hostmgr/p2k/hostcache"
"github.com/uber/peloton/pkg/hostmgr/p2k/plugins"
"github.com/uber/peloton/pkg/hostmgr/p2k/podeventmanager"
"github.com/uber/peloton/pkg/hostmgr/scalar"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
"go.uber.org/multierr"
"go.uber.org/yarpc"
"go.uber.org/yarpc/yarpcerrors"
)
// ServiceHandler implements private.hostmgr.v1alpha.svc.HostManagerService.
type ServiceHandler struct {
// Scheduler plugin.
plugin plugins.Plugin
// Host cache.
hostCache hostcache.HostCache
// podEventManager exports pod EventStream
podEventManager podeventmanager.PodEventManager
}
// NewServiceHandler creates a new ServiceHandler.
func NewServiceHandler(
d *yarpc.Dispatcher,
parent tally.Scope,
plugin plugins.Plugin,
hostCache hostcache.HostCache,
pem podeventmanager.PodEventManager,
) *ServiceHandler {
handler := &ServiceHandler{
plugin: plugin,
hostCache: hostCache,
podEventManager: pem,
}
d.Register(svc.BuildHostManagerServiceYARPCProcedures(handler))
return handler
}
// AcquireHosts implements HostManagerService.AcquireHosts.
func (h *ServiceHandler) AcquireHosts(
ctx context.Context,
req *svc.AcquireHostsRequest,
) (resp *svc.AcquireHostsResponse, err error) {
defer func() {
if err != nil {
log.WithField("req", req).
WithError(err).
Warn("HostMgr.AcquireHosts failed")
}
}()
// TODO: call v0 AcquireHostOffers API for mesos, translate that result
// to AcquireHostsResponse where HostOfferID will become the LeaseID and
// mesos offer per host will be translated to HostSummary (attributes will
// become labels and offers will become resources)
filter := req.GetFilter()
if filter == nil {
return nil, yarpcerrors.InternalErrorf("invalid host filter")
}
leases, filterCount := h.hostCache.AcquireLeases(filter)
return &svc.AcquireHostsResponse{
Hosts: leases,
FilterResultCounts: filterCount,
}, nil
}
// LaunchPods implements HostManagerService.LaunchPods.
func (h *ServiceHandler) LaunchPods(
ctx context.Context,
req *svc.LaunchPodsRequest,
) (resp *svc.LaunchPodsResponse, err error) {
defer func() {
if err != nil {
log.WithFields(
log.Fields{
"launchable_pods": req.GetPods(),
"hostname": req.GetHostname(),
"lease_id": req.GetLeaseId().GetValue(),
}).
WithError(err).
Warn("HostMgr.LaunchPods failed")
}
}()
// TODO: call v0 LaunchTasks API for mesos, translate that result
// to LaunchPodsResponse
if err = validateLaunchPodsRequest(req); err != nil {
return nil, err
}
// Check if pods to be launched is held by the correct host.
// If host held is the same as requested or is empty, it is a noop.
// If host held is different from host in the launch request, releasing
// held host.
holdsToRelease := make(map[string][]*peloton.PodID)
for _, pod := range req.GetPods() {
hostHeld := h.hostCache.GetHostHeldForPod(pod.GetPodId())
if len(hostHeld) != 0 && hostHeld != req.GetHostname() {
log.WithFields(log.Fields{
"pod_id": pod.GetPodId().GetValue(),
"host_held": hostHeld,
"host_requested": req.GetHostname(),
}).Debug("Pod not launching on the host held")
holdsToRelease[hostHeld] = append(holdsToRelease[hostHeld], pod.GetPodId())
}
}
for hostHeld, pods := range holdsToRelease {
if err := h.hostCache.ReleaseHoldForPods(hostHeld, pods); err != nil {
// Only log warning so we get a sense of how often this is
// happening. It is okay to leave the hold as is because pruner
// will remove it eventually.
log.WithFields(log.Fields{
"pod_ids": pods,
"host_held": hostHeld,
"error": err,
}).Warn("Cannot release held host, relying on pruner for cleanup.")
}
}
// Save ports in the first container in PodSpec.
podToSpecMap := make(map[string]*pbpod.PodSpec)
for _, pod := range req.GetPods() {
spec := pod.GetSpec()
if ports := pod.GetPorts(); len(ports) > 0 {
cs := spec.GetContainers()[0]
cs.Ports = buildPortSpec(ports)
}
// podToSpecMap: Should we check for repeat podID here?
podToSpecMap[pod.GetPodId().GetValue()] = spec
}
if err = h.hostCache.CompleteLease(
req.GetHostname(),
req.GetLeaseId().GetValue(),
podToSpecMap,
); err != nil {
return nil, err
}
log.WithFields(log.Fields{
"lease_id": req.GetLeaseId().GetValue(),
"hostname": req.GetHostname(),
"pods": podToSpecMap,
}).Debug("LaunchPods success")
var launchablePods []*models.LaunchablePod
for _, pod := range req.GetPods() {
launchablePods = append(launchablePods, &models.LaunchablePod{
PodId: pod.GetPodId(),
Spec: pod.GetSpec(),
Ports: pod.GetPorts(),
})
}
// Should we check for repeat podID here?
launched, err := h.plugin.LaunchPods(
ctx,
launchablePods,
req.GetHostname(),
)
for _, pod := range launched {
h.hostCache.CompleteLaunchPod(req.GetHostname(), pod)
}
if err != nil {
return nil, err
}
return &svc.LaunchPodsResponse{}, nil
}
func buildPortSpec(ports map[string]uint32) (pss []*pbpod.PortSpec) {
for k, v := range ports {
pss = append(pss, &pbpod.PortSpec{
Name: k,
Value: v,
EnvName: k,
})
}
return pss
}
// KillPods implements HostManagerService.KillPods.
func (h *ServiceHandler) KillPods(
ctx context.Context,
req *svc.KillPodsRequest,
) (resp *svc.KillPodsResponse, err error) {
defer func() {
if err != nil {
log.WithField("pod_ids", req.GetPodIds()).
WithError(err).
Warn("HostMgr.KillPods failed")
}
}()
log.WithFields(log.Fields{
"pod_id": req.GetPodIds(),
}).Debug("KillPods success")
for _, podID := range req.GetPodIds() {
err := h.plugin.KillPod(ctx, podID.GetValue())
if err != nil {
return nil, err
}
}
return &svc.KillPodsResponse{}, nil
}
// KillAndHoldPods implements HostManagerService.KillAndHoldPods.
func (h *ServiceHandler) KillAndHoldPods(
ctx context.Context,
req *svc.KillAndHoldPodsRequest,
) (resp *svc.KillAndHoldPodsResponse, err error) {
defer func() {
if err != nil {
log.WithField("pod_entries", req.GetEntries()).
WithError(err).
Warn("HostMgr.KillAndHoldPods failed")
}
}()
// Hold hosts for pods and log failures if any.
podsToHold := make(map[string][]*peloton.PodID)
for _, entry := range req.GetEntries() {
podsToHold[entry.GetHostToHold()] = append(
podsToHold[entry.GetHostToHold()], entry.GetPodId())
}
for host, pods := range podsToHold {
if err := h.hostCache.HoldForPods(host, pods); err != nil {
log.WithFields(log.Fields{
"host": host,
"pod_ids": pods,
}).WithError(err).
Warn("Failed to hold the host")
}
}
log.WithFields(log.Fields{
"pod_entries": req.GetEntries(),
}).Debug("KillPods success")
// Kill pods. Release host if task kill on host fails.
var errs []error
var failed []*peloton.PodID
holdToRelease := make(map[string][]*peloton.PodID)
for _, entry := range req.GetEntries() {
// TODO: kill pods in parallel.
err := h.plugin.KillPod(ctx, entry.GetPodId().GetValue())
if err != nil {
errs = append(errs, err)
failed = append(failed, entry.GetPodId())
holdToRelease[entry.GetHostToHold()] = append(
holdToRelease[entry.GetHostToHold()], entry.GetPodId())
}
}
for host, pods := range holdToRelease {
if err := h.hostCache.ReleaseHoldForPods(host, pods); err != nil {
log.WithFields(log.Fields{
"host": host,
"error": err,
}).Warn("Failed to release host")
}
}
if len(errs) != 0 {
return &svc.KillAndHoldPodsResponse{},
yarpcerrors.InternalErrorf(multierr.Combine(errs...).Error())
}
return &svc.KillAndHoldPodsResponse{}, nil
}
// ClusterCapacity implements HostManagerService.ClusterCapacity.
func (h *ServiceHandler) ClusterCapacity(
ctx context.Context,
req *svc.ClusterCapacityRequest,
) (resp *svc.ClusterCapacityResponse, err error) {
capacity, allocation := h.hostCache.GetClusterCapacity()
return &svc.ClusterCapacityResponse{
Capacity: toHostMgrSvcResources(capacity),
Allocation: toHostMgrSvcResources(allocation),
}, nil
}
// GetEvents returns all outstanding pod events in the event stream.
// It is for debug purpose only.
func (h *ServiceHandler) GetEvents(
ctx context.Context,
req *svc.GetEventsRequest,
) (resp *svc.GetEventsResponse, err error) {
events, err := h.podEventManager.GetEvents()
if err != nil {
return nil, err
}
return &svc.GetEventsResponse{Events: events}, nil
}
// TerminateLeases implements HostManagerService.TerminateLeases.
func (h *ServiceHandler) TerminateLeases(
ctx context.Context,
req *svc.TerminateLeasesRequest,
) (resp *svc.TerminateLeasesResponse, err error) {
var errs error
for _, lease := range req.Leases {
err := h.hostCache.TerminateLease(lease.Hostname, lease.LeaseId.Value)
errs = multierr.Append(errs, err)
}
if errs != nil {
return nil, yarpcerrors.InternalErrorf(errs.Error())
}
return &svc.TerminateLeasesResponse{}, nil
}
// GetHostCache returns a dump of the host cache.
func (h *ServiceHandler) GetHostCache(
ctx context.Context,
req *svc.GetHostCacheRequest,
) (resp *svc.GetHostCacheResponse, err error) {
resp = &svc.GetHostCacheResponse{
Summaries: []*svc.GetHostCacheResponse_Summary{},
}
for _, summary := range h.hostCache.GetSummaries() {
allocation, capacity := summary.GetAllocated(), summary.GetCapacity()
resp.Summaries = append(resp.Summaries, &svc.GetHostCacheResponse_Summary{
Hostname: summary.GetHostname(),
Status: fmt.Sprintf("%v", summary.GetHostStatus()),
Allocation: []*v1alpha.Resource{
{Kind: "cpu", Capacity: allocation.NonSlack.CPU},
{Kind: "mem", Capacity: allocation.NonSlack.Mem},
{Kind: "disk", Capacity: allocation.NonSlack.Disk},
{Kind: "gpu", Capacity: allocation.NonSlack.GPU},
},
Capacity: []*v1alpha.Resource{
{Kind: "cpu", Capacity: capacity.NonSlack.CPU},
{Kind: "mem", Capacity: capacity.NonSlack.Mem},
{Kind: "disk", Capacity: capacity.NonSlack.Disk},
{Kind: "gpu", Capacity: capacity.NonSlack.GPU},
},
})
}
return resp, nil
}
// validateLaunchPodsRequest does some sanity checks on launch pods request.
func validateLaunchPodsRequest(req *svc.LaunchPodsRequest) error {
if len(req.Pods) <= 0 {
return yarpcerrors.InternalErrorf("Empty pods list")
}
if req.GetLeaseId().GetValue() == "" {
return yarpcerrors.InternalErrorf("Empty lease id")
}
if req.GetHostname() == "" {
return yarpcerrors.InternalErrorf("Empty host name")
}
return nil
}
// toHostSvcResources convert scalar.Resource into hostmgrsvc format.
func toHostMgrSvcResources(r scalar.Resources) []*hostmgr.Resource {
return []*hostmgr.Resource{
{
Kind: common.CPU,
Capacity: r.CPU,
}, {
Kind: common.DISK,
Capacity: r.Disk,
}, {
Kind: common.GPU,
Capacity: r.GPU,
}, {
Kind: common.MEMORY,
Capacity: r.Mem,
},
}
}
// NewTestServiceHandler returns an empty new ServiceHandler ptr for testing.
func NewTestServiceHandler() *ServiceHandler {
return &ServiceHandler{}
}