pkg/hostmgr/handler.go (1,259 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package hostmgr import ( "context" "fmt" "strings" "sync" "time" mesos "github.com/uber/peloton/.gen/mesos/v1" sched "github.com/uber/peloton/.gen/mesos/v1/scheduler" hpb "github.com/uber/peloton/.gen/peloton/api/v0/host" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" pb_task "github.com/uber/peloton/.gen/peloton/api/v0/task" halphapb "github.com/uber/peloton/.gen/peloton/api/v1alpha/host" pb_eventstream "github.com/uber/peloton/.gen/peloton/private/eventstream" "github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/api" "github.com/uber/peloton/pkg/common/constraints" "github.com/uber/peloton/pkg/common/util" yarpcutil "github.com/uber/peloton/pkg/common/util/yarpc" "github.com/uber/peloton/pkg/hostmgr/config" "github.com/uber/peloton/pkg/hostmgr/goalstate" "github.com/uber/peloton/pkg/hostmgr/host" "github.com/uber/peloton/pkg/hostmgr/hostpool/manager" hostmgr_mesos "github.com/uber/peloton/pkg/hostmgr/mesos" "github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/encoding/mpb" "github.com/uber/peloton/pkg/hostmgr/metrics" "github.com/uber/peloton/pkg/hostmgr/models" "github.com/uber/peloton/pkg/hostmgr/offer" "github.com/uber/peloton/pkg/hostmgr/offer/offerpool" "github.com/uber/peloton/pkg/hostmgr/p2k/hostcache" "github.com/uber/peloton/pkg/hostmgr/p2k/plugins" "github.com/uber/peloton/pkg/hostmgr/reserver" "github.com/uber/peloton/pkg/hostmgr/scalar" "github.com/uber/peloton/pkg/hostmgr/summary" hmutil "github.com/uber/peloton/pkg/hostmgr/util" "github.com/uber/peloton/pkg/hostmgr/watchevent" ormobjects "github.com/uber/peloton/pkg/storage/objects" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" "go.uber.org/atomic" "go.uber.org/multierr" "go.uber.org/yarpc" "go.uber.org/yarpc/yarpcerrors" ) const ( // This is the number of completed reservations which // will be fetched in one call from the reserver. _completedReservationLimit = 10 ) // validation errors var ( errEmptyExecutorList = errors.New("empty executor list") errEmptyExecutorID = errors.New("empty executor id") errEmptyOfferOperations = errors.New("empty operations") errEmptyTaskList = errors.New("empty task list") errEmptyAgentID = errors.New("empty agent id") errEmptyHostName = errors.New("empty hostname") errEmptyHostOfferID = errors.New("empty host offer") errNilReservation = errors.New("reservation is nil") errLaunchOperationIsNotLastOperation = errors.New("launch operation is not the last operation") errReservationNotFound = errors.New("reservation could not be made") ) // ServiceHandler implements peloton.private.hostmgr.InternalHostService. type ServiceHandler struct { schedulerClient mpb.SchedulerClient operatorMasterClient mpb.MasterOperatorClient metrics *metrics.Metrics offerPool offerpool.Pool frameworkInfoProvider hostmgr_mesos.FrameworkInfoProvider roleName string mesosDetector hostmgr_mesos.MasterDetector reserver reserver.Reserver hmConfig config.Config slackResourceTypes []string watchProcessor watchevent.WatchProcessor disableKillTasks atomic.Bool hostPoolManager manager.HostPoolManager goalStateDriver goalstate.Driver hostInfoOps ormobjects.HostInfoOps // DB ops for host_info table hostCache hostcache.HostCache plugin plugins.Plugin } // NewServiceHandler creates a new ServiceHandler. func NewServiceHandler( d *yarpc.Dispatcher, metrics *metrics.Metrics, schedulerClient mpb.SchedulerClient, masterOperatorClient mpb.MasterOperatorClient, frameworkInfoProvider hostmgr_mesos.FrameworkInfoProvider, mesosConfig hostmgr_mesos.Config, mesosDetector hostmgr_mesos.MasterDetector, hmConfig *config.Config, slackResourceTypes []string, watchProcessor watchevent.WatchProcessor, hostPoolManager manager.HostPoolManager, goalStateDriver goalstate.Driver, hostInfoOps ormobjects.HostInfoOps, hostCache hostcache.HostCache, plugin plugins.Plugin, ) *ServiceHandler { handler := &ServiceHandler{ schedulerClient: schedulerClient, operatorMasterClient: masterOperatorClient, metrics: metrics, offerPool: offer.GetEventHandler().GetOfferPool(), frameworkInfoProvider: frameworkInfoProvider, roleName: mesosConfig.Framework.Role, mesosDetector: mesosDetector, slackResourceTypes: slackResourceTypes, watchProcessor: watchProcessor, hostPoolManager: hostPoolManager, goalStateDriver: goalStateDriver, hostInfoOps: hostInfoOps, hostCache: hostCache, plugin: plugin, } // Creating Reserver object for handler handler.reserver = reserver.NewReserver( handler.metrics, hmConfig, handler.offerPool) d.Register(hostsvc.BuildInternalHostServiceYARPCProcedures(handler)) return handler } // validateHostFilter validates the host filter passed to // AcquireHostOffers or GetHosts request. func validateHostFilter( filter *hostsvc.HostFilter) *hostsvc.InvalidHostFilter { if filter == nil { return &hostsvc.InvalidHostFilter{ Message: "Empty host filter", } } return nil } // GetReserver returns the reserver object func (h *ServiceHandler) GetReserver() reserver.Reserver { return h.reserver } // DisableKillTasks toggles the flag to disable send kill tasks request // to mesos master func (h *ServiceHandler) DisableKillTasks( ctx context.Context, body *hostsvc.DisableKillTasksRequest, ) (*hostsvc.DisableKillTasksResponse, error) { h.disableKillTasks.Store(true) log.Info("Disabled the kill tasks request to mesos master. ", "To enable restart host manager") return &hostsvc.DisableKillTasksResponse{}, nil } // GetOutstandingOffers returns all the offers present in offer pool. func (h *ServiceHandler) GetOutstandingOffers( ctx context.Context, body *hostsvc.GetOutstandingOffersRequest, ) (*hostsvc.GetOutstandingOffersResponse, error) { hostOffers, count := h.offerPool.GetAllOffers() if count == 0 { return &hostsvc.GetOutstandingOffersResponse{ Error: &hostsvc.GetOutstandingOffersResponse_Error{ NoOffers: &hostsvc.NoOffersError{ Message: "no offers present in offer pool", }, }, }, nil } outstandingOffers := make([]*mesos.Offer, 0, count) for _, hostOffer := range hostOffers { for _, offer := range hostOffer { outstandingOffers = append(outstandingOffers, offer) } } return &hostsvc.GetOutstandingOffersResponse{ Offers: outstandingOffers, }, nil } // GetHostsByQuery implements InternalHostService.GetHostsByQuery. // This function gets host resources from offer pool and filters // host list based on the requirements passed in the request // through hostsvc.HostFilter. func (h *ServiceHandler) GetHostsByQuery( ctx context.Context, body *hostsvc.GetHostsByQueryRequest, ) (*hostsvc.GetHostsByQueryResponse, error) { filterHostnames := body.GetHostnames() hostSummaries, _ := h.offerPool.GetHostSummaries(filterHostnames) hostSummariesCount := len(hostSummaries) if hostSummariesCount == 0 { return &hostsvc.GetHostsByQueryResponse{}, nil } cmpLess := body.GetCmpLess() resourcesLimit := scalar.FromResourceConfig(body.GetResource()) hosts := make([]*hostsvc.GetHostsByQueryResponse_Host, 0, hostSummariesCount) for hostname, hostSummary := range hostSummaries { resources := scalar.FromOffersMapToMesosResources(hostSummary.GetOffers(summary.All)) if !body.GetIncludeRevocable() { _, nonRevocable := scalar.FilterRevocableMesosResources(resources) nonRevocableResources := scalar.FromMesosResources(nonRevocable) if !nonRevocableResources.Compare(resourcesLimit, cmpLess) { continue } hosts = append(hosts, &hostsvc.GetHostsByQueryResponse_Host{ Hostname: hostname, Resources: nonRevocable, Status: toHostStatus(hostSummary.GetHostStatus()), HeldTasks: hostSummary.GetHeldTask(), Tasks: hostSummary.GetTasks(), }) } else { combined, _ := scalar.FilterMesosResources( resources, func(r *mesos.Resource) bool { if r.GetRevocable() != nil { return true } return !hmutil.IsSlackResourceType(r.GetName(), h.slackResourceTypes) }) combinedResources := scalar.FromMesosResources(combined) if !combinedResources.Compare(resourcesLimit, cmpLess) { continue } hosts = append(hosts, &hostsvc.GetHostsByQueryResponse_Host{ Hostname: hostname, Resources: combined, Status: toHostStatus(hostSummary.GetHostStatus()), HeldTasks: hostSummary.GetHeldTask(), Tasks: hostSummary.GetTasks(), }) } } return &hostsvc.GetHostsByQueryResponse{ Hosts: hosts, }, nil } // WatchHostSummaryEvent creates a watch to get notified about changes to Host Summary event. // Changed objects are streamed back to the caller till the watch is // cancelled. func (h *ServiceHandler) WatchHostSummaryEvent( req *hostsvc.WatchEventRequest, stream hostsvc.InternalHostServiceServiceWatchHostSummaryEventYARPCServer, ) error { // Create watch for host summary event log.WithField("request", req). Debug("starting new event watch") topic := watchevent.GetTopicFromInput(req.GetTopic()) if topic != watchevent.HostSummary { return yarpcerrors.InvalidArgumentErrorf("Invalid topic expected hostSummary") } watchID, eventClient, err := h.watchProcessor.NewEventClient(topic) if err != nil { log.WithError(err). Warn("failed to create watch client") return err } defer func() { h.watchProcessor.StopEventClient(watchID) }() initResp := &hostsvc.WatchHostSummaryEventResponse{ WatchId: watchID, Topic: req.GetTopic(), } if err := stream.Send(initResp); err != nil { log.WithField("watch_id", watchID). WithError(err). Warn("failed to send initial response for watch event") return err } for { select { case event := <-eventClient.Input: topicOfEvent := watchevent.GetTopicFromTheEvent(event) if topicOfEvent != watchevent.HostSummary { log.Warn("watch processor sends wrong event, expected hostSummary received different object") return errors.New("watch processor sends different topic than required") } resp := &hostsvc.WatchHostSummaryEventResponse{ WatchId: watchID, Topic: req.GetTopic(), HostSummaryEvent: event.(*halphapb.HostSummary), } if err := stream.Send(resp); err != nil { log.WithField("watch_id", watchID). WithError(err). Warn("failed to send response for watch event") return err } case s := <-eventClient.Signal: log.WithFields(log.Fields{ "watch_id": watchID, "signal": s, }).Debug("received signal") err := handleSignal( watchID, s, map[watchevent.StopSignal]tally.Counter{ watchevent.StopSignalCancel: h.metrics.WatchEventCancel, watchevent.StopSignalOverflow: h.metrics.WatchEventOverflow, }, ) if !yarpcerrors.IsCancelled(err) { log.WithField("watch_id", watchID). WithError(err). Warn("watch stopped due to signal") } return err } } } // WatchEventStreamEvent creates a watch to get notified about changes to mesos task update event. // Changed objects are streamed back to the caller till the watch is // cancelled. func (h *ServiceHandler) WatchEventStreamEvent( req *hostsvc.WatchEventRequest, stream hostsvc.InternalHostServiceServiceWatchEventStreamEventYARPCServer, ) error { // Create watch for mesos task update log.WithField("request", req). Debug("starting new event watch") topic := watchevent.GetTopicFromInput(req.GetTopic()) if topic != watchevent.EventStream { return yarpcerrors.InvalidArgumentErrorf("Invalid topic expected eventstream") } watchID, eventClient, err := h.watchProcessor.NewEventClient(topic) if err != nil { log.WithError(err). Warn("failed to create watch client") return err } defer func() { h.watchProcessor.StopEventClient(watchID) }() initResp := &hostsvc.WatchEventStreamEventResponse{ WatchId: watchID, Topic: req.GetTopic(), } if err := stream.Send(initResp); err != nil { log.WithField("watch_id", watchID). WithError(err). Warn("failed to send initial response for watch event") return err } for { select { case event := <-eventClient.Input: topicOfEvent := watchevent.GetTopicFromTheEvent(event) if topicOfEvent != watchevent.EventStream { log.Warn("watch processor not sending right event, expected eventstream, received different object") return errors.New("watch processor sending different topic than required") } resp := &hostsvc.WatchEventStreamEventResponse{ WatchId: watchID, Topic: req.GetTopic(), MesosTaskUpdate: event.(*pb_eventstream.Event), } if err := stream.Send(resp); err != nil { log.WithField("watch_id", watchID). WithError(err). Warn("failed to send response for watch event") return err } case s := <-eventClient.Signal: log.WithFields(log.Fields{ "watch_id": watchID, "signal": s, }).Debug("received signal") err := handleSignal( watchID, s, map[watchevent.StopSignal]tally.Counter{ watchevent.StopSignalCancel: h.metrics.WatchEventCancel, watchevent.StopSignalOverflow: h.metrics.WatchEventOverflow, }, ) if !yarpcerrors.IsCancelled(err) { log.WithField("watch_id", watchID). WithError(err). Warn("watch stopped due to signal") } return err } } } // handleSignal converts StopSignal to appropriate yarpcerror func handleSignal( watchID string, s watchevent.StopSignal, metrics map[watchevent.StopSignal]tally.Counter, ) error { c := metrics[s] if c != nil { c.Inc(1) } switch s { case watchevent.StopSignalCancel: return yarpcerrors.CancelledErrorf("watch cancelled: %s", watchID) case watchevent.StopSignalOverflow: return yarpcerrors.InternalErrorf("event overflow: %s", watchID) default: return yarpcerrors.InternalErrorf("unexpected signal: %s", s) } } // Cancel cancels a watch. The watch stream will get an error indicating // watch was cancelled and the stream will be closed. func (h *ServiceHandler) CancelWatchEvent( ctx context.Context, req *hostsvc.CancelWatchRequest, ) (*hostsvc.CancelWatchResponse, error) { watchID := req.GetWatchId() err := h.watchProcessor.StopEventClient(watchID) if err != nil { if yarpcerrors.IsNotFound(err) { h.metrics.WatchCancelNotFound.Inc(1) } log.WithField("watch_id", watchID). WithError(err). Warn("failed to stop task client") return nil, err } return &hostsvc.CancelWatchResponse{}, nil } // AcquireHostOffers implements InternalHostService.AcquireHostOffers. func (h *ServiceHandler) AcquireHostOffers( ctx context.Context, body *hostsvc.AcquireHostOffersRequest, ) (response *hostsvc.AcquireHostOffersResponse, err error) { defer func() { if err != nil { err = yarpcutil.ConvertToYARPCError(err) return } h.metrics.AcquireHostOffers.Inc(1) h.metrics.AcquireHostOffersCount.Inc(int64(len(response.HostOffers))) }() if invalid := validateHostFilter(body.GetFilter()); invalid != nil { err = yarpcerrors.InvalidArgumentErrorf("invalid filter") h.metrics.AcquireHostOffersInvalid.Inc(1) log.WithField("filter", body.GetFilter()). Warn("Invalid Filter") return &hostsvc.AcquireHostOffersResponse{ Error: &hostsvc.AcquireHostOffersResponse_Error{ InvalidHostFilter: invalid, }, }, errors.Wrap(err, "invalid filter") } result, resultCount, err := h.offerPool.ClaimForPlace(ctx, body.GetFilter()) if err != nil { h.metrics.AcquireHostOffersFail.Inc(1) log.WithField("filter", body.GetFilter()). WithError(err). Warn("ClaimForPlace failed") return &hostsvc.AcquireHostOffersResponse{ Error: &hostsvc.AcquireHostOffersResponse_Error{ Failure: &hostsvc.AcquireHostOffersFailure{ Message: err.Error(), }, }, }, errors.Wrap(err, "claim for place failed") } response = &hostsvc.AcquireHostOffersResponse{ HostOffers: []*hostsvc.HostOffer{}, FilterResultCounts: resultCount, } for hostname, hostOffer := range result { // ClaimForPlace returns offers grouped by host. Thus every // offer in hostOffer should have the same value for // host-specific information such as AgentId, Attributes etc. offers := hostOffer.Offers if len(offers) <= 0 { log.WithField("host", hostname). Warn("Empty offer slice from host") continue } var resources []*mesos.Resource for _, offer := range offers { resources = append(resources, offer.GetResources()...) } // create the peloton host offer pHostOffer := hostsvc.HostOffer{ Hostname: hostname, AgentId: offers[0].GetAgentId(), Attributes: offers[0].GetAttributes(), Resources: resources, Id: &peloton.HostOfferID{Value: hostOffer.ID}, } response.HostOffers = append(response.HostOffers, &pHostOffer) log.WithFields(log.Fields{ "hostname": hostname, "host_offer_id": hostOffer.ID, }).Info("AcquireHostOffers") } return response, nil } // GetHosts implements InternalHostService.GetHosts. // This function gets the hosts based on resource requirements // and constraints passed in the request through hostsvc.HostFilter func (h *ServiceHandler) GetHosts( ctx context.Context, body *hostsvc.GetHostsRequest, ) (response *hostsvc.GetHostsResponse, err error) { var hosts []*hostsvc.HostInfo defer func() { if err != nil { h.metrics.GetHostsInvalid.Inc(1) err = yarpcerrors.Newf(yarpcerrors.CodeInternal, err.Error()) return } log.WithField("body", body).Debug("GetHosts called") h.metrics.GetHosts.Inc(1) h.metrics.GetHostsCount.Inc(int64(len(hosts))) }() if invalid := validateHostFilter(body.GetFilter()); invalid != nil { response = h.processGetHostsFailure(invalid) return response, errors.New("invalid host filter") } matcher := NewMatcher( body.GetFilter(), constraints.NewEvaluator(pb_task.LabelConstraint_HOST), h.hostPoolManager, func(resourceType string) bool { return hmutil.IsSlackResourceType(resourceType, h.slackResourceTypes) }) result, matchErr := matcher.GetMatchingHosts() if matchErr != nil { response = h.processGetHostsFailure(matchErr) return response, errors.New(matchErr.GetMessage()) } for hostname, agentInfo := range result { hosts = append(hosts, util.CreateHostInfo(hostname, agentInfo)) } response = &hostsvc.GetHostsResponse{ Hosts: hosts, } return response, nil } // processGetHostsFailure process the GetHostsFailure and returns the // error in respose otherwise with empty error func (h *ServiceHandler) processGetHostsFailure( err interface{}, ) (resp *hostsvc.GetHostsResponse) { resp = &hostsvc.GetHostsResponse{ Error: &hostsvc.GetHostsResponse_Error{}, } if filter, ok := err.(*hostsvc.InvalidHostFilter); ok { log.WithField("error", filter.Message).Warn("no matching hosts") resp.Error.InvalidHostFilter = filter } if hostFailure, ok := err.(*hostsvc.GetHostsFailure); ok { log.WithField("error", hostFailure.Message).Warn("no matching hosts") resp.Error.Failure = hostFailure } return resp } // ReleaseHostOffers implements InternalHostService.ReleaseHostOffers. func (h *ServiceHandler) ReleaseHostOffers( ctx context.Context, body *hostsvc.ReleaseHostOffersRequest, ) (response *hostsvc.ReleaseHostOffersResponse, err error) { defer func() { if err != nil { h.metrics.ReleaseHostOffersFail.Inc(1) err = yarpcutil.ConvertToYARPCError(err) return } h.metrics.ReleaseHostOffers.Inc(1) h.metrics.ReleaseHostsCount.Inc(int64(len(body.GetHostOffers()))) }() for _, hostOffer := range body.GetHostOffers() { hostname := hostOffer.GetHostname() if err := h.offerPool.ReturnUnusedOffers(hostname); err != nil { log.WithField("hostoffer", hostOffer). WithError(err). Warn("Cannot return unused offer on host.") } log.WithFields(log.Fields{ "hostname": hostname, "agent_id": hostOffer.GetAgentId().GetValue(), "host_offer_id": hostOffer.GetId().GetValue(), }).Info("ReleaseHostOffers") } return &hostsvc.ReleaseHostOffersResponse{}, nil } // OfferOperations implements InternalHostService.OfferOperations. func (h *ServiceHandler) OfferOperations( ctx context.Context, req *hostsvc.OfferOperationsRequest) ( *hostsvc.OfferOperationsResponse, error) { return nil, fmt.Errorf("Unimplemented") } // LaunchTasks implements InternalHostService.LaunchTasks. func (h *ServiceHandler) LaunchTasks( ctx context.Context, req *hostsvc.LaunchTasksRequest, ) (response *hostsvc.LaunchTasksResponse, err error) { defer func() { if err != nil { err = yarpcutil.ConvertToYARPCError(err) return } }() if err := validateLaunchTasks(req); err != nil { err = yarpcerrors.InvalidArgumentErrorf("%s", err) log.WithFields(log.Fields{ "hostname": req.GetHostname(), "host_offer_id": req.GetId(), "mesos_agent_id": req.GetAgentId(), }).WithError(err).Error("validate launch tasks failed") h.metrics.LaunchTasksInvalid.Inc(1) return &hostsvc.LaunchTasksResponse{ Error: &hostsvc.LaunchTasksResponse_Error{ InvalidArgument: &hostsvc.InvalidArgument{ Message: err.Error(), }, }, }, errors.Wrap(err, "validate launch tasks failed") } hostToTaskIDs := make(map[string][]*peloton.TaskID) for _, launchableTask := range req.GetTasks() { hostHeld := h.offerPool.GetHostHeldForTask(launchableTask.GetId()) if len(hostHeld) != 0 { hostToTaskIDs[hostHeld] = append(hostToTaskIDs[hostHeld], launchableTask.GetId()) } } for hostname, taskIDs := range hostToTaskIDs { if hostname != req.GetHostname() { log.WithFields(log.Fields{ "task_ids": taskIDs, "host_held": hostname, "host_launched": req.GetHostname(), }).Info("task not launched on the host held") if err := h.offerPool.ReleaseHoldForTasks(hostname, taskIDs); err != nil { log.WithFields(log.Fields{ "task_ids": taskIDs, "host_held": hostname, "error": err, }).Warn("fail to release held host when launching tasks on other hosts") continue } } } _, err = h.offerPool.ClaimForLaunch( req.GetHostname(), req.GetId().GetValue(), req.GetTasks(), hostToTaskIDs[req.GetHostname()]..., ) if err != nil { log.WithFields(log.Fields{ "hostname": req.GetHostname(), "host_offer_id": req.GetId(), "mesos_agent_id": req.GetAgentId(), }).WithError(err).Error("claim for launch failed") h.metrics.LaunchTasksInvalidOffers.Inc(1) return &hostsvc.LaunchTasksResponse{ Error: &hostsvc.LaunchTasksResponse_Error{ InvalidOffers: &hostsvc.InvalidOffers{ Message: err.Error(), }, }, }, errors.Wrap(err, "claim for launch failed") } // temporary workaround to add hosts into cache. This step // was part of ClaimForLaunch h.hostCache.AddPodsToHost(req.GetTasks(), req.GetHostname()) var launchablePods []*models.LaunchablePod for _, task := range req.GetTasks() { jobID, instanceID, err := util.ParseJobAndInstanceID(task.GetTaskId().GetValue()) if err != nil { log.WithFields( log.Fields{ "mesos_id": task.GetTaskId().GetValue(), }).WithError(err). Error("fail to parse ID when constructing launchable pods in LaunchTask") continue } launchablePods = append(launchablePods, &models.LaunchablePod{ PodId: util.CreatePodIDFromMesosTaskID(task.GetTaskId()), Spec: api.ConvertTaskConfigToPodSpec(task.GetConfig(), jobID, instanceID), Ports: task.Ports, }) } launchedPods, err := h.plugin.LaunchPods(ctx, launchablePods, req.GetHostname()) if err != nil { h.metrics.LaunchTasksFail.Inc(int64(len(req.GetTasks()))) log.WithFields(log.Fields{ "error": err, "host_offer_id": req.GetId().GetValue(), }).Warn("Tasks launch failure") return &hostsvc.LaunchTasksResponse{ Error: &hostsvc.LaunchTasksResponse_Error{ LaunchFailure: &hostsvc.LaunchFailure{ Message: err.Error(), }, }, }, errors.Wrap(err, "task launch failed") } h.metrics.LaunchTasks.Inc(int64(len(launchedPods))) var taskIDs []string for _, pod := range launchedPods { taskIDs = append(taskIDs, pod.PodId.GetValue()) } log.WithFields(log.Fields{ "task_ids": taskIDs, "hostname": req.GetHostname(), "host_offer_id": req.GetId().GetValue(), }).Info("LaunchTasks") return &hostsvc.LaunchTasksResponse{}, nil } func validateLaunchTasks(request *hostsvc.LaunchTasksRequest) error { if len(request.Tasks) <= 0 { return errEmptyTaskList } if len(request.GetAgentId().GetValue()) <= 0 { return errEmptyAgentID } if len(request.Hostname) <= 0 { return errEmptyHostName } if len(request.GetId().GetValue()) <= 0 { return errEmptyHostOfferID } return nil } // ShutdownExecutors implements InternalHostService.ShutdownExecutors. func (h *ServiceHandler) ShutdownExecutors( ctx context.Context, body *hostsvc.ShutdownExecutorsRequest, ) (response *hostsvc.ShutdownExecutorsResponse, err error) { defer func() { log.WithField("request", body).Debug("ShutdownExecutor called.") if err != nil { err = yarpcutil.ConvertToYARPCError(err) return } }() shutdownExecutors := body.GetExecutors() if err = validateShutdownExecutors(body); err != nil { err = yarpcerrors.InvalidArgumentErrorf("%s", err) h.metrics.ShutdownExecutorsInvalid.Inc(1) return &hostsvc.ShutdownExecutorsResponse{ Error: &hostsvc.ShutdownExecutorsResponse_Error{ InvalidExecutors: &hostsvc.InvalidExecutors{ Message: err.Error(), }, }, }, errors.Wrap(err, "invalid shutdown executor request") } var wg sync.WaitGroup failedMutex := &sync.Mutex{} var failedExecutors []*hostsvc.ExecutorOnAgent var errs []string for _, shutdownExecutor := range shutdownExecutors { wg.Add(1) go func(shutdownExecutor *hostsvc.ExecutorOnAgent) { defer wg.Done() executorID := shutdownExecutor.GetExecutorId() agentID := shutdownExecutor.GetAgentId() callType := sched.Call_SHUTDOWN msg := &sched.Call{ FrameworkId: h.frameworkInfoProvider.GetFrameworkID(ctx), Type: &callType, Shutdown: &sched.Call_Shutdown{ ExecutorId: executorID, AgentId: agentID, }, } msid := h.frameworkInfoProvider.GetMesosStreamID(ctx) err := h.schedulerClient.Call(msid, msg) if err != nil { h.metrics.ShutdownExecutorsFail.Inc(1) log.WithFields(log.Fields{ "executor_id": executorID, "agent_id": agentID, "error": err, }).Error("Shutdown executor failure") failedMutex.Lock() defer failedMutex.Unlock() failedExecutors = append( failedExecutors, shutdownExecutor) errs = append(errs, err.Error()) return } h.metrics.ShutdownExecutors.Inc(1) log.WithFields(log.Fields{ "executor_id": executorID, "agent_id": agentID, }).Info("Shutdown executor request sent") }(shutdownExecutor) } wg.Wait() if len(failedExecutors) > 0 { err = errors.New("unable to shutdown executors") return &hostsvc.ShutdownExecutorsResponse{ Error: &hostsvc.ShutdownExecutorsResponse_Error{ ShutdownFailure: &hostsvc.ShutdownFailure{ Message: strings.Join(errs, ";"), Executors: failedExecutors, }, }, }, err } return &hostsvc.ShutdownExecutorsResponse{}, nil } func validateShutdownExecutors(request *hostsvc.ShutdownExecutorsRequest) error { executorList := request.GetExecutors() if len(executorList) <= 0 { return errEmptyExecutorList } for _, executor := range executorList { if executor.GetAgentId() == nil { return errEmptyAgentID } if executor.GetExecutorId() == nil { return errEmptyExecutorID } } return nil } // KillAndReserveTasks implements InternalHostService.KillAndReserveTasks. func (h *ServiceHandler) KillAndReserveTasks( ctx context.Context, body *hostsvc.KillAndReserveTasksRequest, ) (*hostsvc.KillAndReserveTasksResponse, error) { var err error defer func() { if err != nil { err = yarpcutil.ConvertToYARPCError(err) return } }() var taskIDs []*mesos.TaskID heldHostToTaskIDs := make(map[string][]*peloton.TaskID) for _, entry := range body.GetEntries() { taskIDs = append(taskIDs, entry.GetTaskId()) heldHostToTaskIDs[entry.GetHostToReserve()] = append(heldHostToTaskIDs[entry.GetHostToReserve()], entry.GetId()) } // reserve the hosts first for hostname, taskIDs := range heldHostToTaskIDs { // fail to reserve hosts should not fail kill, // just fail the in-place update and move on // to kill the task if err := h.offerPool.HoldForTasks(hostname, taskIDs); err != nil { log.WithFields(log.Fields{ "hostname": hostname, "task_ids": taskIDs, }).WithError(err). Warn("fail to hold the host") continue } } // then kill the tasks invalidTaskIDs, killFailure := h.killTasks(ctx, taskIDs) if invalidTaskIDs == nil && killFailure == nil { err = errors.New("unable to kill tasks") return &hostsvc.KillAndReserveTasksResponse{}, nil } resp := &hostsvc.KillAndReserveTasksResponse{ Error: &hostsvc.KillAndReserveTasksResponse_Error{ InvalidTaskIDs: invalidTaskIDs, KillFailure: killFailure, }, } // if task kill fails, try to release the tasks // TODO: can release only the failed tasks, for now it is ok, since // one task is killed in each call to the API. for hostname, taskIDs := range heldHostToTaskIDs { if err := h.offerPool.ReleaseHoldForTasks(hostname, taskIDs); err != nil { log.WithFields(log.Fields{ "hostname": hostname, "task_ids": taskIDs, }).WithError(err). Warn("fail to release hold on host after task kill fail") continue } } return resp, nil } // KillTasks implements InternalHostService.KillTasks. func (h *ServiceHandler) KillTasks( ctx context.Context, body *hostsvc.KillTasksRequest) ( *hostsvc.KillTasksResponse, error) { var err error defer func() { if err != nil { err = yarpcutil.ConvertToYARPCError(err) return } }() invalidTaskIDs, killFailure := h.killTasks(ctx, body.GetTaskIds()) // release all tasks even if some kill fails, because it is not certain // if the kill request does go through. // Worst case for releasing host when a task is not killed is in-place update // fails to place the task on the desired host. h.releaseHostsHeldForTasks(parseTaskIDsFromMesosTaskIDs(body.GetTaskIds())) if invalidTaskIDs != nil || killFailure != nil { err = errors.New("unable to kill tasks") return &hostsvc.KillTasksResponse{ Error: &hostsvc.KillTasksResponse_Error{ InvalidTaskIDs: invalidTaskIDs, KillFailure: killFailure, }, }, nil } return &hostsvc.KillTasksResponse{}, nil } func (h *ServiceHandler) killTasks( ctx context.Context, taskIds []*mesos.TaskID) ( *hostsvc.InvalidTaskIDs, *hostsvc.KillFailure) { if len(taskIds) == 0 { return &hostsvc.InvalidTaskIDs{Message: "Empty task ids"}, nil } if h.disableKillTasks.Load() { return nil, &hostsvc.KillFailure{Message: "Kill tasks request is disabled"} } var failedTaskIds []*mesos.TaskID var errs []string for _, taskID := range taskIds { if err := h.plugin.KillPod(ctx, taskID.GetValue()); err != nil { errs = append(errs, err.Error()) failedTaskIds = append(failedTaskIds, taskID) h.metrics.KillTasksFail.Inc(1) } else { h.metrics.KillTasks.Inc(1) } } if len(failedTaskIds) > 0 { return nil, &hostsvc.KillFailure{ Message: strings.Join(errs, ";"), TaskIds: failedTaskIds, } } return nil, nil } // ReserveResources implements InternalHostService.ReserveResources. func (h *ServiceHandler) ReserveResources( ctx context.Context, body *hostsvc.ReserveResourcesRequest) ( *hostsvc.ReserveResourcesResponse, error) { log.Debug("ReserveResources called.") return nil, fmt.Errorf("Unimplemented") } // UnreserveResources implements InternalHostService.UnreserveResources. func (h *ServiceHandler) UnreserveResources( ctx context.Context, body *hostsvc.UnreserveResourcesRequest) ( *hostsvc.UnreserveResourcesResponse, error) { log.Debug("UnreserveResources called.") return nil, fmt.Errorf("Unimplemented") } // CreateVolumes implements InternalHostService.CreateVolumes. func (h *ServiceHandler) CreateVolumes( ctx context.Context, body *hostsvc.CreateVolumesRequest) ( *hostsvc.CreateVolumesResponse, error) { log.Debug("CreateVolumes called.") return nil, fmt.Errorf("Unimplemented") } // DestroyVolumes implements InternalHostService.DestroyVolumes. func (h *ServiceHandler) DestroyVolumes( ctx context.Context, body *hostsvc.DestroyVolumesRequest) ( *hostsvc.DestroyVolumesResponse, error) { log.Debug("DestroyVolumes called.") return nil, fmt.Errorf("Unimplemented") } // ClusterCapacity fetches the allocated resources to the framework func (h *ServiceHandler) ClusterCapacity( ctx context.Context, body *hostsvc.ClusterCapacityRequest, ) (response *hostsvc.ClusterCapacityResponse, err error) { defer func() { if err != nil { h.metrics.ClusterCapacityFail.Inc(1) err = yarpcutil.ConvertToYARPCError(err) return } h.metrics.ClusterCapacity.Inc(1) h.metrics.RefreshClusterCapacityGauges(response) }() frameWorkID := h.frameworkInfoProvider.GetFrameworkID(ctx) if len(frameWorkID.GetValue()) == 0 { return &hostsvc.ClusterCapacityResponse{ Error: &hostsvc.ClusterCapacityResponse_Error{ ClusterUnavailable: &hostsvc.ClusterUnavailable{ Message: "unable to fetch framework ID", }, }, }, nil } allocatedResources, _, err := h.operatorMasterClient. GetTasksAllocation(frameWorkID.GetValue()) if err != nil { log.WithError(err).Error("error making cluster capacity request") return &hostsvc.ClusterCapacityResponse{ Error: &hostsvc.ClusterCapacityResponse_Error{ ClusterUnavailable: &hostsvc.ClusterUnavailable{ Message: err.Error(), }, }, }, errors.Wrap(err, "error making cluster capacity request") } agentMap := host.GetAgentMap() if agentMap == nil || len(agentMap.RegisteredAgents) == 0 { log.Error("error getting host agentmap") return &hostsvc.ClusterCapacityResponse{ Error: &hostsvc.ClusterCapacityResponse_Error{ ClusterUnavailable: &hostsvc.ClusterUnavailable{ Message: "error getting host agentmap", }, }, }, nil } nonRevocableClusterCapacity := agentMap.Capacity // NOTE: This only works if // 1) no quota is set for any role, or // 2) quota is set for the same role peloton is registered under. // If operator set a quota for another role but leave peloton's role unset, // cluster capacity will be over estimated. quotaResources, err := h.operatorMasterClient.GetQuota(h.roleName) if err != nil { log.WithError(err).Error("error getting quota") return &hostsvc.ClusterCapacityResponse{ Error: &hostsvc.ClusterCapacityResponse_Error{ ClusterUnavailable: &hostsvc.ClusterUnavailable{ Message: err.Error(), }, }, }, errors.Wrap(err, "error getting quota") } if err == nil && quotaResources != nil { nonRevocableClusterCapacity = scalar.FromMesosResources(quotaResources) if nonRevocableClusterCapacity.GetCPU() <= 0 { nonRevocableClusterCapacity.CPU = agentMap.Capacity.GetCPU() } if nonRevocableClusterCapacity.GetMem() <= 0 { nonRevocableClusterCapacity.Mem = agentMap.Capacity.GetMem() } if nonRevocableClusterCapacity.GetDisk() <= 0 { nonRevocableClusterCapacity.Disk = agentMap.Capacity.GetDisk() } if nonRevocableClusterCapacity.GetGPU() <= 0 { nonRevocableClusterCapacity.GPU = agentMap.Capacity.GetGPU() } } revocableAllocated, nonRevocableAllocated := scalar.FilterMesosResources( allocatedResources, func(r *mesos.Resource) bool { return r.GetRevocable() != nil && hmutil.IsSlackResourceType(r.GetName(), h.slackResourceTypes) }) physicalAllocated := scalar.FromMesosResources(nonRevocableAllocated) slackAllocated := scalar.FromMesosResources(revocableAllocated) response = &hostsvc.ClusterCapacityResponse{ Resources: toHostSvcResources(&physicalAllocated), AllocatedSlackResources: toHostSvcResources(&slackAllocated), PhysicalResources: toHostSvcResources(&nonRevocableClusterCapacity), PhysicalSlackResources: toHostSvcResources(&agentMap.SlackCapacity), } return response, nil } // GetMesosMasterHostPort returns the Leader Mesos Master hostname and port. func (h *ServiceHandler) GetMesosMasterHostPort( ctx context.Context, body *hostsvc.MesosMasterHostPortRequest, ) (response *hostsvc.MesosMasterHostPortResponse, err error) { defer func() { if err != nil { h.metrics.GetMesosMasterHostPortFail.Inc(1) err = yarpcutil.ConvertToYARPCError(err) return } h.metrics.GetMesosMasterHostPort.Inc(1) }() mesosMasterInfo := strings.Split(h.mesosDetector.HostPort(), ":") if len(mesosMasterInfo) != 2 { err = errors.New("unable to fetch leader mesos master hostname & port") return nil, err } response = &hostsvc.MesosMasterHostPortResponse{ Hostname: mesosMasterInfo[0], Port: mesosMasterInfo[1], } return response, nil } // ReserveHosts reserves the host for a specified task in the request. // Host Manager will keep the host offers to itself till the time // it does not have enough offers to itself and once that's fulfilled // it will return the reservation with the offer to placement engine. // till the time reservation is fulfilled or reservation timeout , // offers from that host will not be given to any other placement engine. func (h *ServiceHandler) ReserveHosts( ctx context.Context, req *hostsvc.ReserveHostsRequest, ) (*hostsvc.ReserveHostsResponse, error) { log.WithField("request", req).Debug("ReserveHosts called.") if err := validateReserveHosts(req); err != nil { return &hostsvc.ReserveHostsResponse{ Error: &hostsvc.ReserveHostsResponse_Error{ Failed: &hostsvc.ReservationFailed{ Message: err.Error(), }, }, }, nil } err := h.reserver.EnqueueReservation(ctx, req.Reservation) if err != nil { log.WithError(err).Error("failed to enqueue reservation") return &hostsvc.ReserveHostsResponse{ Error: &hostsvc.ReserveHostsResponse_Error{ Failed: &hostsvc.ReservationFailed{ Message: errReservationNotFound.Error(), }, }, }, nil } log.Debug("ReserveHosts returned.") return &hostsvc.ReserveHostsResponse{}, nil } func validateReserveHosts(req *hostsvc.ReserveHostsRequest) error { if req.GetReservation() == nil { return errNilReservation } return nil } // GetCompletedReservations gets the completed host reservations from // reserver. Based on the reserver it returns the list of completed // Reservations (hostsvc.CompletedReservation) or return the NoFound Error. func (h *ServiceHandler) GetCompletedReservations( ctx context.Context, req *hostsvc.GetCompletedReservationRequest, ) (*hostsvc.GetCompletedReservationResponse, error) { log.WithField("request", req).Debug("GetCompletedReservations called.") completedReservations, err := h.reserver.DequeueCompletedReservation( ctx, _completedReservationLimit) if err != nil { return &hostsvc.GetCompletedReservationResponse{ Error: &hostsvc.GetCompletedReservationResponse_Error{ NotFound: &hostsvc.NotFound{ Message: err.Error(), }, }, }, nil } log.Debug("GetCompletedReservations returned.") return &hostsvc.GetCompletedReservationResponse{ CompletedReservations: completedReservations, }, nil } // GetDrainingHosts implements InternalHostService.GetDrainingHosts func (h *ServiceHandler) GetDrainingHosts( ctx context.Context, request *hostsvc.GetDrainingHostsRequest, ) (*hostsvc.GetDrainingHostsResponse, error) { h.metrics.GetDrainingHosts.Inc(1) limit := request.GetLimit() // Get all hosts in maintenance from DB hostInfos, err := h.hostInfoOps.GetAll(ctx) if err != nil { h.metrics.GetDrainingHostsFail.Inc(1) return nil, err } // Filter in only hosts in DRAINING state var hostnames []string for _, h := range hostInfos { if limit > 0 && uint32(len(hostnames)) == limit { break } if h.GetState() == hpb.HostState_HOST_STATE_DRAINING { hostnames = append(hostnames, h.GetHostname()) } } log.WithField("hostnames", hostnames). Debug("draining hosts returned by GetDrainingHosts") return &hostsvc.GetDrainingHostsResponse{ Hostnames: hostnames, }, nil } // MarkHostDrained implements InternalHostService.MarkHostDrained // Mark the host as drained. This method is called by Resource Manager Drainer // when there are no tasks on the DRAINING host func (h *ServiceHandler) MarkHostDrained( ctx context.Context, request *hostsvc.MarkHostDrainedRequest, ) (*hostsvc.MarkHostDrainedResponse, error) { // Get host from DB hostInfo, err := h.hostInfoOps.Get(ctx, request.GetHostname()) if err != nil { return nil, err } // Validate host current state is DRAINING if hostInfo.GetState() != hpb.HostState_HOST_STATE_DRAINING { log.WithField("hostname", request.GetHostname()). Error("host cannot be marked as drained since it is not in draining state") return nil, yarpcerrors.NotFoundErrorf("Host not in DRAINING state") } // Update host state to DRAINED in DB if err := h.hostInfoOps.UpdateState( ctx, request.GetHostname(), hpb.HostState_HOST_STATE_DRAINED); err != nil { return nil, err } log.WithField("hostname", request.GetHostname()).Info("host marked as drained") // Enqueue into the Goal State Engine h.goalStateDriver.EnqueueHost(request.GetHostname(), time.Now()) h.metrics.MarkHostDrained.Inc(1) return &hostsvc.MarkHostDrainedResponse{ Hostname: request.GetHostname(), }, nil } // GetMesosAgentInfo implements InternalHostService.GetMesosAgentInfo // Returns Mesos agent info for a single agent or all agents. func (h *ServiceHandler) GetMesosAgentInfo( ctx context.Context, request *hostsvc.GetMesosAgentInfoRequest, ) (*hostsvc.GetMesosAgentInfoResponse, error) { r := &hostsvc.GetMesosAgentInfoResponse{} hostname := request.GetHostname() agentMap := host.GetAgentMap() if agentMap != nil { if hostname != "" { if info, ok := agentMap.RegisteredAgents[hostname]; ok { r.Agents = append(r.Agents, info) } else { r.Error = &hostsvc.GetMesosAgentInfoResponse_Error{ HostNotFound: &hostsvc.HostNotFound{ Message: "host not found", }, } } } else { for _, info := range agentMap.RegisteredAgents { r.Agents = append(r.Agents, info) } } } return r, nil } // ReleaseHostsHeldForTasks releases the hosts which are held for the tasks provided func (h *ServiceHandler) ReleaseHostsHeldForTasks( ctx context.Context, req *hostsvc.ReleaseHostsHeldForTasksRequest, ) (*hostsvc.ReleaseHostsHeldForTasksResponse, error) { if err := h.releaseHostsHeldForTasks(req.GetIds()); err != nil { return &hostsvc.ReleaseHostsHeldForTasksResponse{ Error: &hostsvc.ReleaseHostsHeldForTasksResponse_Error{ Message: err.Error(), }, }, nil } return &hostsvc.ReleaseHostsHeldForTasksResponse{}, nil } // GetTasksByHostState gets tasks on hosts in the specified host state. func (h *ServiceHandler) GetTasksByHostState( ctx context.Context, req *hostsvc.GetTasksByHostStateRequest, ) (response *hostsvc.GetTasksByHostStateResponse, err error) { return &hostsvc.GetTasksByHostStateResponse{}, nil } func (h *ServiceHandler) releaseHostsHeldForTasks(taskIDs []*peloton.TaskID) error { var errs []error hostHeldForTasks := make(map[string][]*peloton.TaskID) for _, taskID := range taskIDs { hostname := h.offerPool.GetHostHeldForTask(taskID) if len(hostname) != 0 { hostHeldForTasks[hostname] = append(hostHeldForTasks[hostname], taskID) } } for hostname, taskIDs := range hostHeldForTasks { if err := h.offerPool.ReleaseHoldForTasks(hostname, taskIDs); err != nil { errs = append(errs, err) } } if len(errs) == 0 { return nil } return multierr.Combine(errs...) } // GetHostPoolCapacity fetches the resources for all host-pools. func (h *ServiceHandler) GetHostPoolCapacity( ctx context.Context, body *hostsvc.GetHostPoolCapacityRequest, ) (response *hostsvc.GetHostPoolCapacityResponse, err error) { if h.hostPoolManager == nil { err = yarpcerrors.UnimplementedErrorf("host pools not enabled") return } response = &hostsvc.GetHostPoolCapacityResponse{} for _, p := range h.hostPoolManager.Pools() { cap := p.Capacity() hpResource := &hostsvc.HostPoolResources{ PoolName: p.ID(), PhysicalCapacity: toHostSvcResources(&cap.Physical), SlackCapacity: toHostSvcResources(&cap.Slack), } response.Pools = append(response.Pools, hpResource) } return } // Helper function to convert scalar.Resource into hostsvc format. func toHostSvcResources(rs *scalar.Resources) []*hostsvc.Resource { return []*hostsvc.Resource{ { Kind: common.CPU, Capacity: rs.CPU, }, { Kind: common.DISK, Capacity: rs.Disk, }, { Kind: common.GPU, Capacity: rs.GPU, }, { Kind: common.MEMORY, Capacity: rs.Mem, }, } } // Helper function to convert summary.HostStatus to string func toHostStatus(hostStatus summary.HostStatus) string { var status string switch hostStatus { case summary.ReadyHost: status = "ready" case summary.PlacingHost: status = "placing" case summary.ReservedHost: status = "reserved" case summary.HeldHost: status = "held" default: status = "unknown" } return status } func parseTaskIDsFromMesosTaskIDs(ids []*mesos.TaskID) []*peloton.TaskID { var result []*peloton.TaskID for _, id := range ids { if taskID, err := util.ParseTaskIDFromMesosTaskID(id.GetValue()); err == nil { result = append(result, &peloton.TaskID{Value: taskID}) } else { log.WithError(err). Error("unexpected mesos task id") } } return result } // NewTestServiceHandler returns an empty new ServiceHandler ptr for testing. func NewTestServiceHandler() *ServiceHandler { return &ServiceHandler{} }