pkg/hostmgr/p2k/plugins/mesos/mesos.go (382 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mesos import ( "context" "sync" "time" mesos "github.com/uber/peloton/.gen/mesos/v1" mesosmaster "github.com/uber/peloton/.gen/mesos/v1/master" sched "github.com/uber/peloton/.gen/mesos/v1/scheduler" v0peloton "github.com/uber/peloton/.gen/peloton/api/v0/peloton" "github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton" pbpod "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod" "github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc" "github.com/uber/peloton/pkg/common/api" "github.com/uber/peloton/pkg/common/lifecycle" "github.com/uber/peloton/pkg/common/util" "github.com/uber/peloton/pkg/hostmgr/factory/task" hostmgrmesos "github.com/uber/peloton/pkg/hostmgr/mesos" "github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/encoding/mpb" "github.com/uber/peloton/pkg/hostmgr/models" "github.com/uber/peloton/pkg/hostmgr/p2k/scalar" hmscalar "github.com/uber/peloton/pkg/hostmgr/scalar" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" "go.uber.org/yarpc" "go.uber.org/yarpc/yarpcerrors" ) const mesosTaskUpdateAckChanSize = 1000 // MesosManager implements the plugin for the Mesos cluster manager. type MesosManager struct { // dispatcher for yarpc d *yarpc.Dispatcher lf lifecycle.LifeCycle // Pod events channel. This channel is used to send pod events up stream to the event stream. podEventCh chan<- *scalar.PodEvent // Host events channel. This channel is used to convert host offers and capacity information to host cache. hostEventCh chan<- *scalar.HostEvent offerManager *offerManager frameworkInfoProvider hostmgrmesos.FrameworkInfoProvider schedulerClient mpb.SchedulerClient updateAckConcurrency int // ackChannel buffers the pod events to be acknowledged. AckPodEvent adds an event to be acked to this channel. // ackPodEventWorker consumes this event and sends an ack back to Mesos. ackChannel chan *scalar.PodEvent // Map to store outstanding mesos task status update acknowledgements // used to dedupe same event. ackStatusMap sync.Map metrics *metrics once sync.Once agentSyncer *agentSyncer // Map to store agentID to hostname. // When a task update mesos event comes, it only as agent ID with it. // However, peloton requires hostname to decide which populate hostsummary // and other cache. As a result, mesos plugin needs to maintain this map // by digesting host agent info, and looks up corresponding hostname with // the agentID when an event comes in. agentIDToHostname sync.Map } func NewMesosManager( d *yarpc.Dispatcher, frameworkInfoProvider hostmgrmesos.FrameworkInfoProvider, schedulerClient mpb.SchedulerClient, operatorClient mpb.MasterOperatorClient, agentInfoRefreshInterval time.Duration, offerHoldTime time.Duration, scope tally.Scope, podEventCh chan<- *scalar.PodEvent, hostEventCh chan<- *scalar.HostEvent, ) *MesosManager { return &MesosManager{ d: d, lf: lifecycle.NewLifeCycle(), metrics: newMetrics(scope.SubScope("mesos_manager")), frameworkInfoProvider: frameworkInfoProvider, schedulerClient: schedulerClient, podEventCh: podEventCh, hostEventCh: hostEventCh, offerManager: newOfferManager(offerHoldTime), ackChannel: make(chan *scalar.PodEvent, mesosTaskUpdateAckChanSize), once: sync.Once{}, agentSyncer: newAgentSyncer( operatorClient, agentInfoRefreshInterval, ), } } // Start the plugin. func (m *MesosManager) Start() error { if !m.lf.Start() { // already started, // skip the action return nil } //TODO: remove comment after MesosManager takes over mesos callback //m.once.Do(func() { // procedures := map[sched.Event_Type]interface{}{ // sched.Event_OFFERS: m.Offers, // sched.Event_RESCIND: m.Rescind, // sched.Event_UPDATE: m.Update, // } // // for typ, hdl := range procedures { // name := typ.String() // mpb.Register(m.d, hostmgrmesos.ServiceName, mpb.Procedure(name, hdl)) // } //}) m.agentSyncer.Start() m.startProcessAgentInfo(m.agentSyncer.AgentCh()) m.startAsyncProcessTaskUpdates() return nil } // Stop the plugin. func (m *MesosManager) Stop() { if !m.lf.Stop() { // already stopped, // skip the action return } m.agentSyncer.Stop() m.offerManager.Clear() } // LaunchPods launch a list of pods on a host. func (m *MesosManager) LaunchPods( ctx context.Context, pods []*models.LaunchablePod, hostname string, ) ([]*models.LaunchablePod, error) { var offerIds []*mesos.OfferID var mesosResources []*mesos.Resource var mesosTasks []*mesos.TaskInfo var mesosTaskIds []string offers := m.offerManager.GetOffers(hostname) for _, offer := range offers { offerIds = append(offerIds, offer.GetId()) mesosResources = append(mesosResources, offer.GetResources()...) } if len(offerIds) == 0 { return nil, yarpcerrors.InternalErrorf("no offer found to launch pods on %s", hostname) } builder := task.NewBuilder(mesosResources) // assume only one agent on a host, // i.e. agentID is the same for all offers from the same host agentID := offers[offerIds[0].GetValue()].GetAgentId() for _, pod := range pods { launchableTask, err := convertPodSpecToLaunchableTask(pod.PodId, pod.Spec, pod.Ports) if err != nil { return nil, err } mesosTask, err := builder.Build(launchableTask) if err != nil { return nil, err } mesosTask.AgentId = agentID mesosTasks = append(mesosTasks, mesosTask) mesosTaskIds = append(mesosTaskIds, mesosTask.GetTaskId().GetValue()) } callType := sched.Call_ACCEPT opType := mesos.Offer_Operation_LAUNCH msg := &sched.Call{ FrameworkId: m.frameworkInfoProvider.GetFrameworkID(ctx), Type: &callType, Accept: &sched.Call_Accept{ OfferIds: offerIds, Operations: []*mesos.Offer_Operation{ { Type: &opType, Launch: &mesos.Offer_Operation_Launch{ TaskInfos: mesosTasks, }, }, }, }, } msid := m.frameworkInfoProvider.GetMesosStreamID(ctx) err := m.schedulerClient.Call(msid, msg) if err != nil { // Decline offers upon launch failure in a best effort manner, // because peloton no longer holds the offers in host summary. // When declining offer is called, // if launch does not go through, mesos would send new offers with the // resources. // If launch does go through, this call should not affect launched task. // It is still a best effort way to clean offers up, peloton still // rely on offer expiration to clean up the offers left behind. m.offerManager.RemoveOfferForHost(hostname) m.declineOffers(ctx, offerIds) m.metrics.LaunchPodFail.Inc(1) return nil, err } // call to mesos is successful, // remove the offers so no new task would be placed m.offerManager.RemoveOfferForHost(hostname) m.metrics.LaunchPod.Inc(1) return pods, nil } // declineOffers calls mesos master to decline list of offers func (m *MesosManager) declineOffers( ctx context.Context, offerIDs []*mesos.OfferID) error { callType := sched.Call_DECLINE msg := &sched.Call{ FrameworkId: m.frameworkInfoProvider.GetFrameworkID(ctx), Type: &callType, Decline: &sched.Call_Decline{ OfferIds: offerIDs, }, } msid := m.frameworkInfoProvider.GetMesosStreamID(ctx) err := m.schedulerClient.Call(msid, msg) if err != nil { // Ideally, we assume that Mesos has offer_timeout configured, // so in the event that offer declining call fails, offers // should eventually be invalidated by Mesos. log.WithError(err). WithField("call", msg). Warn("Failed to decline offers.") m.metrics.DeclineOffersFail.Inc(1) return err } m.metrics.DeclineOffers.Inc(int64(len(offerIDs))) return nil } // KillPod kills a pod on a host. func (m *MesosManager) KillPod(ctx context.Context, podID string) error { callType := sched.Call_KILL msg := &sched.Call{ FrameworkId: m.frameworkInfoProvider.GetFrameworkID(ctx), Type: &callType, Kill: &sched.Call_Kill{ TaskId: &mesos.TaskID{Value: &podID}, }, } err := m.schedulerClient.Call( m.frameworkInfoProvider.GetMesosStreamID(ctx), msg, ) if err != nil { m.metrics.KillPodFail.Inc(1) } else { m.metrics.KillPod.Inc(1) } return err } // AckPodEvent is only implemented by mesos plugin. For K8s this is a noop. func (m *MesosManager) AckPodEvent( event *scalar.PodEvent, ) { // Add this to the mesos task status update ack channel and handle it asynchronously. m.ackChannel <- event } // startAsyncProcessTaskUpdates concurrently process task status update events // ready to ACK iff uuid is not nil. func (m *MesosManager) startAsyncProcessTaskUpdates() { for i := 0; i < m.updateAckConcurrency; i++ { go m.ackPodEventWorker() } } func (m *MesosManager) ackPodEventWorker() { for pe := range m.ackChannel { // dedupe event. if pe.EventID == "" { continue } if _, ok := m.ackStatusMap.Load(pe.EventID); ok { m.metrics.TaskUpdateAckDeDupe.Inc(1) continue } // This is a new event to be acknowledged. Add it to the dedupe map of acks. m.ackStatusMap.Store(pe.EventID, struct{}{}) // if ack failed at mesos master then agent will re-sent. if err := m.acknowledgeTaskUpdate( context.Background(), pe, ); err != nil { log.WithField("pod_event", pe.Event). WithError(err). Error("Failed to acknowledgeTaskUpdate") } // Once acked, delete this from dedupe map. m.ackStatusMap.Delete(pe.EventID) } } // acknowledgeTaskUpdate, ACK task status update events // thru POST scheduler client call to Mesos Master. func (m *MesosManager) acknowledgeTaskUpdate( ctx context.Context, e *scalar.PodEvent) error { pe := e.Event m.metrics.TaskUpdateAck.Inc(1) callType := sched.Call_ACKNOWLEDGE msid := hostmgrmesos.GetSchedulerDriver().GetMesosStreamID(ctx) agentIDStr := pe.GetAgentId() taskIdStr := pe.GetPodId().GetValue() msg := &sched.Call{ FrameworkId: hostmgrmesos.GetSchedulerDriver().GetFrameworkID(ctx), Type: &callType, Acknowledge: &sched.Call_Acknowledge{ AgentId: &mesos.AgentID{Value: &agentIDStr}, TaskId: &mesos.TaskID{Value: &taskIdStr}, Uuid: []byte(e.EventID), }, } if err := m.schedulerClient.Call(msid, msg); err != nil { return err } log.WithField("task_status", pe).Debug("Acked task update") return nil } func (m *MesosManager) startProcessAgentInfo( agentCh <-chan []*mesosmaster.Response_GetAgents_Agent, ) { // The first batch needs to be populated in sync, // so after MesosManager starts and begins to receive mesos events, // MesosManager would have the agentIDToHostname ready m.processAgentHostMap(<-agentCh) go func() { for { select { case agents := <-agentCh: m.processAgentHostMap(agents) case <-m.lf.StopCh(): return } } }() } func (m *MesosManager) processAgentHostMap( agents []*mesosmaster.Response_GetAgents_Agent, ) { for _, agent := range agents { agentID := agent.GetAgentInfo().GetId().GetValue() hostname := agent.GetAgentInfo().GetHostname() m.agentIDToHostname.Store(agentID, hostname) for _, agent := range agents { capacity := models.HostResources{ NonSlack: hmscalar.FromMesosResources(agent.GetTotalResources()), } m.hostEventCh <- scalar.BuildHostEventFromResource( hostname, models.HostResources{}, capacity, scalar.UpdateAgent, ) } } } // ReconcileHosts will return the current state of hosts in the cluster. func (m *MesosManager) ReconcileHosts() ([]*scalar.HostInfo, error) { // TODO: fill in implementation return nil, nil } // Offers is the mesos callback that sends the offers from master // TODO: add metrics similar to what offerpool has func (m *MesosManager) Offers(ctx context.Context, body *sched.Event) error { event := body.GetOffers() log.WithField("event", event).Info("MesosManager: processing Offer event") hosts := m.offerManager.AddOffers(event.Offers) for host := range hosts { // TODO: extract slack and non slack resources from offer manager. availableResources := models.HostResources{ NonSlack: m.offerManager.GetResources(host), } evt := scalar.BuildHostEventFromResource( host, availableResources, models.HostResources{}, scalar.UpdateHostAvailableRes, ) m.hostEventCh <- evt } return nil } // Rescind offers func (m *MesosManager) Rescind(ctx context.Context, body *sched.Event) error { event := body.GetRescind() log.WithField("event", event).Info("OfferManager: processing Rescind event") host := m.offerManager.RemoveOffer(event.GetOfferId().GetValue()) if len(host) != 0 { availableResources := models.HostResources{ NonSlack: m.offerManager.GetResources(host), } evt := scalar.BuildHostEventFromResource( host, availableResources, models.HostResources{}, scalar.UpdateHostAvailableRes, ) m.hostEventCh <- evt } return nil } // Update is the Mesos callback on mesos task status updates func (m *MesosManager) Update(ctx context.Context, body *sched.Event) error { taskUpdate := body.GetUpdate() // Todo implement watch processor notifications. hostname, ok := m.agentIDToHostname.Load( taskUpdate.GetStatus().GetAgentId().GetValue()) if !ok { // Hostname is not found, maybe the agent info is not // populated yet. Return directly and wait for mesos // to resend the event. m.metrics.AgentIDToHostnameMissing.Inc(1) log.WithField("agent_id", taskUpdate.GetStatus().GetAgentId().GetValue(), ).Warn("cannot find hostname for agent_id") return nil } // Update the metrics in go routine to unblock API callback m.podEventCh <- buildPodEventFromMesosTaskStatus(taskUpdate, hostname.(string)) m.metrics.TaskUpdateCounter.Inc(1) taskStateCounter := m.metrics.scope.Counter( "task_state_" + taskUpdate.GetStatus().GetState().String()) taskStateCounter.Inc(1) return nil } func convertPodSpecToLaunchableTask( id *peloton.PodID, spec *pbpod.PodSpec, ports map[string]uint32, ) (*hostsvc.LaunchableTask, error) { config, err := api.ConvertPodSpecToTaskConfig(spec) if err != nil { return nil, err } taskId := id.GetValue() return &hostsvc.LaunchableTask{ TaskId: &mesos.TaskID{Value: &taskId}, Config: config, Id: &v0peloton.TaskID{Value: spec.GetPodName().GetValue()}, Ports: ports, }, nil } func buildPodEventFromMesosTaskStatus( evt *sched.Event_Update, hostname string, ) *scalar.PodEvent { healthy := pbpod.HealthState_HEALTH_STATE_UNHEALTHY.String() if evt.GetStatus().GetHealthy() { healthy = pbpod.HealthState_HEALTH_STATE_HEALTHY.String() } taskState := util.MesosStateToPelotonState(evt.GetStatus().GetState()) return &scalar.PodEvent{ Event: &pbpod.PodEvent{ PodId: &peloton.PodID{Value: evt.GetStatus().GetTaskId().GetValue()}, ActualState: api.ConvertTaskStateToPodState(taskState).String(), Timestamp: util.FormatTime( evt.GetStatus().GetTimestamp(), time.RFC3339Nano, ), AgentId: evt.GetStatus().GetAgentId().GetValue(), Hostname: hostname, Message: evt.GetStatus().GetMessage(), Reason: evt.GetStatus().GetReason().String(), Healthy: healthy, }, EventType: scalar.UpdatePod, EventID: string(evt.GetStatus().GetUuid()), } }