pkg/hostmgr/offer/handler.go (405 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package offer import ( "context" "sync" "sync/atomic" "time" mesos "github.com/uber/peloton/.gen/mesos/v1" sched "github.com/uber/peloton/.gen/mesos/v1/scheduler" pb_eventstream "github.com/uber/peloton/.gen/peloton/private/eventstream" "github.com/uber/peloton/.gen/peloton/private/resmgrsvc" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/background" "github.com/uber/peloton/pkg/common/backoff" "github.com/uber/peloton/pkg/common/cirbuf" "github.com/uber/peloton/pkg/common/eventstream" "github.com/uber/peloton/pkg/common/util" "github.com/uber/peloton/pkg/hostmgr/binpacking" "github.com/uber/peloton/pkg/hostmgr/config" "github.com/uber/peloton/pkg/hostmgr/hostpool/manager" hostmgr_mesos "github.com/uber/peloton/pkg/hostmgr/mesos" "github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/encoding/mpb" "github.com/uber/peloton/pkg/hostmgr/offer/offerpool" mesosplugins "github.com/uber/peloton/pkg/hostmgr/p2k/plugins/mesos" "github.com/uber/peloton/pkg/hostmgr/prune" "github.com/uber/peloton/pkg/hostmgr/watchevent" "github.com/pborman/uuid" log "github.com/sirupsen/logrus" uatomic "github.com/uber-go/atomic" "github.com/uber-go/tally" "go.uber.org/yarpc" ) const ( _heldHostPrunerName = "heldHostPruner" _placingHostPrunerName = "placingHostPruner" _binPackingRefresherName = "binPackingRefresher" _poolMetricsRefresh = "poolMetricsRefresh" _taskStatusUpdateStreamRefresh = "taskStatusUpdateStreamRefresh" _poolMetricsRefreshPeriod = 10 * time.Second _notifyResourceManagerPeriod = 10 * time.Second ) // EventHandler defines the interface for offer event handler that is // called by leader election callbacks type EventHandler interface { // Start starts the offer event handler, after which the handler will be // ready to process process offer events from an Mesos inbound. // Offers sent to the handler before `Start()` could be silently discarded. Start() error // Stop stops the offer event handlers and clears cached offers in pool. // Offers sent to the handler after `Stop()` could be silently discarded. Stop() error // GetOfferPool returns the underlying Pool holding the offers. GetOfferPool() offerpool.Pool // Get the handler for eventstream GetEventStreamHandler() *eventstream.Handler // SetHostPoolManager set host pool manager in the event handler. // It should be called during event handler initialization. SetHostPoolManager(manager manager.HostPoolManager) } // Singleton event handler for offers and mesos status update events var handler *eventHandler // eventHandler has handlers for mesos callbacks for offers, // status updates and more. type eventHandler struct { eventStreamHandler *eventstream.Handler schedulerclient mpb.SchedulerClient watchProcessor watchevent.WatchProcessor offerPool offerpool.Pool offerPruner Pruner updateAckConcurrency int // Buffers the mesos task status updates to be acknowledged ackChannel chan *mesos.TaskStatus // Map to store outstanding mesos task status update acknowledgements // used to dedupe same event ackStatusMap sync.Map // Temporary measure to pass mesos events into mesos plugin, mesosPlugin *mesosplugins.MesosManager metrics *Metrics } // Represents eventstream to persist mesos status update events, // which are consumed by Job Manager and Resource Manager. // Once both clients process the mesos status update event, it is purged // from eventstream and acknowledgement is sent to Mesos Master. var eventStreamHandler *eventstream.EventHandler // eventForwarder is the struct to forward status update events to // resource manager. It implements eventstream.EventHandler and it // forwards the events to remote in the OnEvents function. type eventForwarder struct { // client to send NotifyTaskUpdatesRequest client resmgrsvc.ResourceManagerServiceYARPCClient // Tracking the progress returned from remote side progress *uint64 } // GetEventProgress returns the event forward progress func (f *eventForwarder) GetEventProgress() uint64 { return atomic.LoadUint64(f.progress) } // InitEventHandler initializes the event handler for offers func InitEventHandler( d *yarpc.Dispatcher, parent tally.Scope, schedulerClient mpb.SchedulerClient, resmgrClient resmgrsvc.ResourceManagerServiceYARPCClient, backgroundMgr background.Manager, ranker binpacking.Ranker, hostMgrConfig config.Config, processor watchevent.WatchProcessor, hostPoolManager manager.HostPoolManager, mesosPlugin *mesosplugins.MesosManager, ) { if handler != nil { log.Warning("Offer event handler has already been initialized") return } metrics := offerpool.NewMetrics(parent) pool := offerpool.NewOfferPool( time.Duration(hostMgrConfig.OfferHoldTimeSec)*time.Second, schedulerClient, metrics, hostmgr_mesos.GetSchedulerDriver(), hostMgrConfig.ScarceResourceTypes, hostMgrConfig.SlackResourceTypes, ranker, hostMgrConfig.HostPlacingOfferStatusTimeout, processor, hostPoolManager, ) placingHostPruner := prune.NewPlacingHostPruner( pool, parent.SubScope(_placingHostPrunerName), ) backgroundMgr.RegisterWorks( background.Work{ Name: _placingHostPrunerName, Func: placingHostPruner.Prune, Period: hostMgrConfig.HostPruningPeriodSec, }, ) heldHostPruner := prune.NewHeldHostPruner( pool, parent.SubScope(_heldHostPrunerName), ) backgroundMgr.RegisterWorks( background.Work{ Name: _heldHostPrunerName, Func: heldHostPruner.Prune, Period: hostMgrConfig.HeldHostPruningPeriodSec, }, ) binPackingRefresher := offerpool.NewRefresher( pool, ) backgroundMgr.RegisterWorks( background.Work{ Name: _binPackingRefresherName, Func: binPackingRefresher.Refresh, Period: hostMgrConfig.BinPackingRefreshIntervalSec, }, ) backgroundMgr.RegisterWorks( background.Work{ Name: _poolMetricsRefresh, Func: func(_ *uatomic.Bool) { pool.RefreshGaugeMaps() }, Period: _poolMetricsRefreshPeriod, }, ) //TODO: refactor OfferPruner as a background worker handler = &eventHandler{ schedulerclient: schedulerClient, watchProcessor: processor, offerPool: pool, offerPruner: NewOfferPruner( pool, time.Duration(hostMgrConfig.OfferPruningPeriodSec)*time.Second, metrics), metrics: NewMetrics(parent), ackChannel: make(chan *mesos.TaskStatus, hostMgrConfig.TaskUpdateBufferSize), updateAckConcurrency: hostMgrConfig.TaskUpdateAckConcurrency, mesosPlugin: mesosPlugin, } handler.eventStreamHandler = initEventStreamHandler( d, handler, hostMgrConfig.TaskUpdateBufferSize, parent.SubScope("EventStreamHandler")) initResMgrEventForwarder( handler.eventStreamHandler, resmgrClient, parent.SubScope("ResourceManagerClient")) handler.startAsyncProcessTaskUpdates() backgroundMgr.RegisterWorks( background.Work{ Name: _taskStatusUpdateStreamRefresh, Func: func(_ *uatomic.Bool) { handler.UpdateCounters() }, Period: _poolMetricsRefreshPeriod, }, ) procedures := map[sched.Event_Type]interface{}{ sched.Event_OFFERS: handler.Offers, sched.Event_INVERSE_OFFERS: handler.InverseOffers, sched.Event_RESCIND: handler.Rescind, sched.Event_RESCIND_INVERSE_OFFER: handler.RescindInverseOffer, sched.Event_UPDATE: handler.Update, } for typ, hdl := range procedures { name := typ.String() mpb.Register(d, hostmgr_mesos.ServiceName, mpb.Procedure(name, hdl)) } } // initResMgrEventForwarder, creates an event stream client to push // mesos task status update events to Resource Manager from Host Manager. func initResMgrEventForwarder( eventStreamHandler *eventstream.Handler, client resmgrsvc.ResourceManagerServiceYARPCClient, scope tally.Scope) { var progress uint64 eventForwarder := &eventForwarder{ client: client, progress: &progress, } eventstream.NewLocalEventStreamClient( common.PelotonResourceManager, eventStreamHandler, eventForwarder, scope, ) } // initEventStreamHandler initializes two event streams for communicating // task status updates with Job Manager & Resource Manager. // Job Manager: pulls task status update events from event stream. // Resource Manager: Host Manager call event stream client // to push task status update events. func initEventStreamHandler( d *yarpc.Dispatcher, purgedEventsProcessor eventstream.PurgedEventsProcessor, bufferSize int, scope tally.Scope) *eventstream.Handler { eventStreamHandler := eventstream.NewEventStreamHandler( bufferSize, []string{common.PelotonJobManager, common.PelotonResourceManager}, purgedEventsProcessor, scope, ) d.Register(pb_eventstream.BuildEventStreamServiceYARPCProcedures(eventStreamHandler)) return eventStreamHandler } // GetEventHandler returns the handler for Mesos offer events. This // function assumes the handler has been initialized as part of the // InitEventHandler function. // TODO: We should start a study of https://github.com/uber-common/inject // and see whether we feel comfortable of using it. func GetEventHandler() EventHandler { if handler == nil { log.Fatal("Offer event handler is not initialized") } return handler } // Get the event stream handler. func (h *eventHandler) GetEventStreamHandler() *eventstream.Handler { return handler.eventStreamHandler } // SetHostPoolManager set host pool manager in the event handler. // It should be called during event handler initialization. func (h *eventHandler) SetHostPoolManager(manager manager.HostPoolManager) { h.offerPool.SetHostPoolManager(manager) } // Offers is the mesos callback that sends the offers from master func (h *eventHandler) Offers(ctx context.Context, body *sched.Event) error { event := body.GetOffers() for _, offer := range event.Offers { log.WithFields(log.Fields{ "offer_id": offer.GetId().GetValue(), "hostname": offer.GetHostname(), }).Info("offers received") } h.offerPool.AddOffers(ctx, event.Offers) // temporary measure to hook mesos plugins h.mesosPlugin.Offers(ctx, body) return nil } // InverseOffers is the mesos callback that sends the InverseOffers from master func (h *eventHandler) InverseOffers(ctx context.Context, body *sched.Event) error { event := body.GetInverseOffers() log.WithField("event", event). Debug("OfferManager: processing InverseOffers event") // TODO: Handle inverse offers from Mesos return nil } // Rescind offers func (h *eventHandler) Rescind(ctx context.Context, body *sched.Event) error { event := body.GetRescind() log.WithField("event", event).Debug("OfferManager: processing Rescind event") h.offerPool.RescindOffer(event.OfferId) // temporary measure to hook mesos plugins h.mesosPlugin.Rescind(ctx, body) return nil } // RescindInverseOffer rescinds a inverse offer func (h *eventHandler) RescindInverseOffer(ctx context.Context, body *sched.Event) error { event := body.GetRescindInverseOffer() log.WithField("event", event). Debug("OfferManager: processing RescindInverseOffer event") // TODO: handle rescind inverse offer events return nil } // Update is the Mesos callback on mesos state updates func (h *eventHandler) Update(ctx context.Context, body *sched.Event) error { var err error var event *pb_eventstream.Event taskUpdate := body.GetUpdate() defer func() { if err == nil { h.watchProcessor.NotifyEventChange(event) } // Update the metrics in go routine to unblock API callback go func() { h.metrics.taskUpdateCounter.Inc(1) taskStateCounter := h.metrics.scope.Counter( "task_state_" + taskUpdate.GetStatus().GetState().String()) taskStateCounter.Inc(1) }() }() event = &pb_eventstream.Event{ MesosTaskStatus: taskUpdate.GetStatus(), Type: pb_eventstream.Event_MESOS_TASK_STATUS, } err = h.eventStreamHandler.AddEvent(event) if err != nil { log.WithError(err). WithField("status_update", taskUpdate.GetStatus()). Error("Cannot add status update") } h.offerPool.UpdateTasksOnHost( taskUpdate.GetStatus().GetTaskId().GetValue(), util.MesosStateToPelotonState(taskUpdate.GetStatus().GetState()), nil) // temporary measure to pass mesos event into plugin h.mesosPlugin.Update(ctx, body) // If buffer is full, AddStatusUpdate would fail and peloton would not // ack the status update and mesos master would resend the status update. // Return nil otherwise the framework would disconnect with the mesos master return nil } // OnV0Event callback func (f *eventForwarder) OnV0Event(event *pb_eventstream.Event) { //Not implemented } // OnV0Events callback. In this callback, a batch of events are forwarded to // resource manager. func (f *eventForwarder) OnV0Events(events []*pb_eventstream.Event) { if len(events) == 0 { return } var response *resmgrsvc.NotifyTaskUpdatesResponse request := &resmgrsvc.NotifyTaskUpdatesRequest{ Events: events, } b := backoff.NewRetrier(backoff.NewRetryPolicy( 1, _notifyResourceManagerPeriod)) for { var err error response, err = f.notifyResourceManager(request) if err == nil || !backoff.CheckRetry(b) { break } } if response.PurgeOffset > 0 { atomic.StoreUint64(f.progress, response.PurgeOffset) } if response.Error != nil { log.WithField("notify_task_updates_response_error", response.Error). Error("NotifyTaskUpdatesRequest failed") } } func (f *eventForwarder) notifyResourceManager( request *resmgrsvc.NotifyTaskUpdatesRequest) ( *resmgrsvc.NotifyTaskUpdatesResponse, error) { ctx, cancelFunc := context.WithTimeout( context.Background(), _notifyResourceManagerPeriod) defer cancelFunc() return f.client.NotifyTaskUpdates(ctx, request) } // EventPurged is for implementing PurgedEventsProcessor interface. func (h *eventHandler) EventPurged(events []*cirbuf.CircularBufferItem) { for _, e := range events { event, ok := e.Value.(*pb_eventstream.Event) if !ok { // should never happen continue } if event.GetType() != pb_eventstream.Event_MESOS_TASK_STATUS { continue } uid := uuid.UUID(event.GetMesosTaskStatus().GetUuid()).String() if uid == "" { continue } _, ok = h.ackStatusMap.Load(uid) if ok { h.metrics.taskUpdateAckDeDupe.Inc(1) continue } h.ackStatusMap.Store(uid, struct{}{}) h.ackChannel <- event.GetMesosTaskStatus() } } // startAsyncProcessTaskUpdates concurrently process task status update events // ready to ACK iff uuid is not nil. func (h *eventHandler) startAsyncProcessTaskUpdates() { for i := 0; i < h.updateAckConcurrency; i++ { go func() { for taskStatus := range h.ackChannel { uid := uuid.UUID(taskStatus.GetUuid()).String() // once acked, delete from map // if ack failed at mesos master then agent will re-send h.ackStatusMap.Delete(uid) if err := h.acknowledgeTaskUpdate( context.Background(), taskStatus); err != nil { log.WithField("task_status", *taskStatus). WithError(err). Error("Failed to acknowledgeTaskUpdate") } } }() } } // acknowledgeTaskUpdate, ACK task status update events // thru POST scheduler client call to Mesos Master. func (h *eventHandler) acknowledgeTaskUpdate( ctx context.Context, taskStatus *mesos.TaskStatus) error { h.metrics.taskUpdateAck.Inc(1) callType := sched.Call_ACKNOWLEDGE msid := hostmgr_mesos.GetSchedulerDriver().GetMesosStreamID(ctx) msg := &sched.Call{ FrameworkId: hostmgr_mesos.GetSchedulerDriver().GetFrameworkID(ctx), Type: &callType, Acknowledge: &sched.Call_Acknowledge{ AgentId: taskStatus.AgentId, TaskId: taskStatus.TaskId, Uuid: taskStatus.Uuid, }, } if err := h.schedulerclient.Call(msid, msg); err != nil { return err } log.WithField("task_status", *taskStatus).Debug("Acked task update") return nil } // UpdateCounters tracks the count for task status update & ack count. func (h *eventHandler) UpdateCounters() { h.metrics.taskAckChannelSize.Update(float64(len(h.ackChannel))) var length float64 h.ackStatusMap.Range(func(key, _ interface{}) bool { length++ return true }) h.metrics.taskAckMapSize.Update(length) } // Pool returns the underlying OfferPool. func (h *eventHandler) GetOfferPool() offerpool.Pool { return h.offerPool } // Start runs startup related procedures func (h *eventHandler) Start() error { // Start offer pruner h.offerPruner.Start() // TODO: add error handling return nil } // Stop runs shutdown related procedures func (h *eventHandler) Stop() error { // Clean up all existing offers h.offerPool.Clear() // Stop offer pruner h.offerPruner.Stop() // TODO: add error handling return nil }