pkg/placement/reserver/reserver.go (322 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package reserver import ( "context" "errors" "fmt" "math" "math/rand" "reflect" "sync" "time" "github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc" "github.com/uber/peloton/.gen/peloton/private/resmgr" "github.com/uber/peloton/pkg/common/async" "github.com/uber/peloton/pkg/common/queue" "github.com/uber/peloton/pkg/placement/config" "github.com/uber/peloton/pkg/placement/hosts" tally_metrics "github.com/uber/peloton/pkg/placement/metrics" "github.com/uber/peloton/pkg/placement/models" models_v0 "github.com/uber/peloton/pkg/placement/models/v0" "github.com/uber/peloton/pkg/placement/tasks" log "github.com/sirupsen/logrus" ) const ( // number of randomized hosts which will be choosen from // all the hosts _randomizedHosts = 10 // _noTasksTimeoutPenalty is the timeout value for a get tasks request. _noHostsTimeoutPenalty = 1 * time.Second // _noTasksTimeoutPenalty is the timeout value for a get tasks request. _noTasksTimeoutPenalty = 1 * time.Second _completedReservationQueue = "completed-ReservationQueue" // represents the max size of the preemption queue _maxReservationQueueSize = 10000 // reservation queue name _reservationQueue = "reservation-queue" ) var ( errNoValidCompletedReservation = errors.New("no valid completed reservations found") ) // Reserver represents a placement engine's reservation module // It gets all the hosts based on filter passed to host manager // it chooses the random host from the list and call reserve the // chosen host based on the task. type Reserver interface { // Adding daemon interface for Reserver async.Daemon // GetReservationQueue returns the reservation queue GetReservationQueue() queue.Queue // Reserve reserves the task to host in hostmanager Reserve(ctx context.Context) (time.Duration, error) // Places the assignments which are ready for host reservation into reservationQueue ProcessHostReservation(ctx context.Context, assignments []models.Task) error // EnqueueReservation enqueues the hostsvc.reservation to // the reservation queue EnqueueReservation(reservation *hostsvc.Reservation) error // GetCompletedReservation gets the completed tasks with offers // by that placement can be created GetCompletedReservation(ctx context.Context) ([]*hostsvc.CompletedReservation, error) } // reserver is the struct which implements Reserver interface type reserver struct { lock sync.Mutex // Placement config for the reserver config *config.PlacementConfig // Placement engine metrics metrics *tally_metrics.Metrics // hostService for accessing the host manager for getting host list // as well as reserving host hostService hosts.Service // taskService for placing task on reserved host taskService tasks.Service // daemon object for making reserver a daemon process daemon async.Daemon // reservation queue of type resmgr.task which placement engine enqueues the tasks from // resourcemanager for reserver to make the reservation reservationQueue queue.Queue // completed reservation queue completedReservationQueue queue.Queue // task-> reservation mapping reservations map[string][]*models_v0.Host // tasks map indexed by taskID tasks map[string]*resmgr.Task } // NewReserver creates a new reserver which gets the tasks from the reservationQueue // and based on the requirements from the task get the hosts list and randomly choose // the host and make the reservation on that host for the task. func NewReserver( metrics *tally_metrics.Metrics, cfg *config.PlacementConfig, hostsService hosts.Service, taskService tasks.Service) Reserver { reserver := &reserver{ config: cfg, hostService: hostsService, taskService: taskService, metrics: metrics, reservationQueue: queue.NewQueue( _reservationQueue, reflect.TypeOf(hostsvc.Reservation{}), _maxReservationQueueSize, ), completedReservationQueue: queue.NewQueue( _completedReservationQueue, reflect.TypeOf(hostsvc.CompletedReservation{}), _maxReservationQueueSize, ), reservations: make(map[string][]*models_v0.Host), tasks: make(map[string]*resmgr.Task), } reserver.daemon = async.NewDaemon("Placement Engine Reserver", reserver) return reserver } // Start method starts the daemon process func (r *reserver) Start() { r.daemon.Start() r.metrics.Running.Update(1) } // Run method implements runnable from daemon // this is the method which gets called while starting the // daemon process. func (r *reserver) Run(ctx context.Context) error { timer := time.NewTimer(time.Duration(0)) for { select { case <-ctx.Done(): if !timer.Stop() { <-timer.C } return ctx.Err() case <-timer.C: } delay, err := r.Reserve(ctx) if err != nil { log.WithError(err).Info("tasks can't reserve hosts") } err = r.enqueueCompletedReservation(ctx) if err != nil { log.WithError(err).Debug("error finding completed reservation") } // We need to process the completed reservations err = r.processCompletedReservations(ctx) if err != nil { log.WithError(err).Info("error in processing completed reservations") } timer.Reset(delay) } } // Stop method will stop the daemon process. func (r *reserver) Stop() { r.daemon.Stop() r.metrics.Running.Update(0) } // Reserve method is being called from Run method // This method does following steps // 1. Get Tasks from the reservation queue // 2. Find the hosts list from hostmanager matching filter // 3. choose one random host from the list // 4. reserve the host in host manager func (r *reserver) Reserve(ctx context.Context) (time.Duration, error) { // Get reservation from the reservation queue item, err := r.reservationQueue.Dequeue(1 * time.Second) if err != nil { log.Debug("No items in reservation queue") return _noTasksTimeoutPenalty, nil } reservation, ok := item.(*hostsvc.Reservation) if !ok || reservation.GetTask() == nil || reservation.GetTask().GetId() == nil { return _noTasksTimeoutPenalty, fmt.Errorf("not a valid task %s", reservation.GetTask()) } // storing the tasks task := reservation.GetTask() r.tasks[task.GetId().Value] = task log.WithFields(log.Fields{ "task": task.Id.Value, }).Debug("Reserving host for task") hostFilter := r.getHostFilter(task) // Find the hosts list from hostmanager matching filter hosts, err := r.hostService.GetHosts(ctx, task, hostFilter) if err != nil { log.WithFields(log.Fields{ "host_filter": hostFilter, "task": task.Id, }).Info("Couldn't acquire hosts for task") return _noHostsTimeoutPenalty, err } var hostToReserve []*models_v0.Host // choose one random host from the list hostToReserve = append(hostToReserve, r.findHost(hosts)) // reserve the host in host manager if err := r.hostService.ReserveHost(ctx, hostToReserve, task); err != nil { log.WithFields(log.Fields{ "host": hostToReserve[0].GetHost().Hostname, "task": task.Id.Value, }).Info("Host could not be reserved") return _noHostsTimeoutPenalty, err } //Updating the task to hosts map r.reservations[task.GetId().Value] = hostToReserve log.WithFields(log.Fields{ "host": hostToReserve[0].GetHost().Hostname, "task": task.Id.Value, }).Info("Host reserved for task") return time.Duration(0), nil } // findHost randomly chooses the number of hosts and then // out of those hosts choose the one which have lowest number // of tasks running func (r *reserver) findHost(hosts []*models_v0.Host) *models_v0.Host { lenHosts := len(hosts) lenRandomHosts := int(math.Min( float64(_randomizedHosts), float64(lenHosts))) randomHosts := make([]*models_v0.Host, lenRandomHosts) for i := 0; i < lenRandomHosts; i++ { randomHosts[i] = hosts[random(0, lenHosts)+0] } return r.findHostWithMinTasks(randomHosts) } // findHostWithMinTasks returns the host which has the minimun running task // from the list of hosts provided func (r *reserver) findHostWithMinTasks(hosts []*models_v0.Host) *models_v0.Host { min := taskLen(hosts[0]) minIndex := 0 for i, host := range hosts { if min >= taskLen(host) { min = taskLen(host) minIndex = i } } return hosts[minIndex] } func taskLen(host *models_v0.Host) int { if host.GetTasks() == nil { return 0 } return len(host.GetTasks()) } func random(min, max int) int { rand.Seed(time.Now().Unix()) return rand.Intn(max-min) + min } func (r *reserver) getHostFilter(task *resmgr.Task) *hostsvc.HostFilter { result := &hostsvc.HostFilter{ ResourceConstraint: &hostsvc.ResourceConstraint{ Minimum: task.Resource, NumPorts: task.NumPorts, }, } if constraint := task.Constraint; constraint != nil { result.SchedulingConstraint = constraint } return result } // ProcessHostReservation places the assignments which are ready for host reservation // into reservationQueue func (r *reserver) ProcessHostReservation( ctx context.Context, assignments []models.Task) error { for _, assignment := range assignments { if assignment.IsReadyForHostReservation() { log.WithField("assignment", assignment). Debug("process host reservation") err := r.EnqueueReservation(&hostsvc.Reservation{ Task: assignment.GetResmgrTaskV0(), }) if err != nil { return err } } } return nil } // GetReservationQueue gets the reszervation queue func (r *reserver) GetReservationQueue() queue.Queue { return r.reservationQueue } func (r *reserver) GetCompletedReservation(ctx context.Context, ) ([]*hostsvc.CompletedReservation, error) { var reservations []*hostsvc.CompletedReservation item, err := r.completedReservationQueue.Dequeue(100 * time.Millisecond) if err != nil { if _, isTimeout := err.(queue.DequeueTimeOutError); !isTimeout { // error is not due to timeout so return return reservations, err } // we timed out, lets return return reservations, nil } res, ok := item.(*hostsvc.CompletedReservation) if !ok { // this should never happen return reservations, errors.New("invalid item in queue") } if res != nil { reservations = append(reservations, res) } return reservations, nil } // enqueueCompletedReservation gets out the completed reservations from // hosts service and if any, enqueues them into completed reservation // queue so that the handler can create placements out of them. func (r *reserver) enqueueCompletedReservation(ctx context.Context) error { // Call hosts service to find the reservation reservations, err := r.hostService.GetCompletedReservation(ctx) if err != nil { return err } // Taking a lock here on reserver r.lock.Lock() defer r.lock.Unlock() // found the valid reservations for _, res := range reservations { // Check if reservation is succeeded or not // by looking at the offers length // if offers length is zero that means // we need to reserve if res.HostOffer == nil { err := r.reserveAgain(res.GetTask()) if err != nil { log.WithError(err). Errorf("task %s could not be reserved, "+ "dropping it ", res.GetTask().GetId().Value) } continue } // Valid completed reservation found err := r.completedReservationQueue.Enqueue(res) if err != nil { // Assuming placing timeout in hostmanager will // make the host available log.WithError(err).WithFields(log.Fields{ "completed_reservation": res, }).Errorf("task %s could not be send for placement, "+ "dropping it ", res.GetTask().GetId().Value) continue } // clean the completed reservation r.cleanReservation(res.GetTask()) } return nil } // processCompletedReservations will be processing completed reservations // and it will set the placements in resmgr func (r *reserver) processCompletedReservations(ctx context.Context) error { reservations, err := r.GetCompletedReservation(ctx) if err != nil { return err } if len(reservations) == 0 { log.Debug("no valid reservations") return nil } now := time.Now() maxRounds := r.config.MaxRounds.Value(reservations[0].GetTask().Type) duration := r.config.MaxDurations.Value(reservations[0].GetTask().Type) deadline := now.Add(duration) desiredHostPlacementDeadline := now.Add(r.config.MaxDesiredHostPlacementDuration) assignments := make([]models.Task, len(reservations)) for i, res := range reservations { task := models_v0.NewTask(nil, res.GetTask(), deadline, desiredHostPlacementDeadline, maxRounds, ) assignments[i] = models_v0.NewAssignment(task) assignments[i].SetPlacement(&models_v0.HostOffers{Offer: res.HostOffer}) } log.WithField("placements", assignments). Debug("Process completed reservations") r.taskService.SetPlacements(ctx, assignments, nil) return nil } func (r *reserver) reserveAgain(task *resmgr.Task) error { // cleaning the reservation r.cleanReservation(task) // enqueuing the task again for the reservation err := r.reservationQueue.Enqueue( &hostsvc.Reservation{ Task: task, }) if err != nil { return err } return nil } func (r *reserver) cleanReservation(task *resmgr.Task) { if task != nil { delete(r.reservations, task.GetId().Value) delete(r.tasks, task.GetId().Value) } } func (r *reserver) EnqueueReservation(reservation *hostsvc.Reservation) error { if reservation == nil { return errors.New("invalid reservation") } err := r.reservationQueue.Enqueue(reservation) if err != nil { return err } return nil }