pkg/placement/engine.go (417 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package placement import ( "context" "strings" "sync" "time" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" "github.com/uber/peloton/pkg/common/async" "github.com/uber/peloton/pkg/placement/config" "github.com/uber/peloton/pkg/placement/hosts" tally_metrics "github.com/uber/peloton/pkg/placement/metrics" "github.com/uber/peloton/pkg/placement/models" "github.com/uber/peloton/pkg/placement/offers" "github.com/uber/peloton/pkg/placement/plugins" "github.com/uber/peloton/pkg/placement/reserver" "github.com/uber/peloton/pkg/placement/tasks" ) const ( // _noOffersTimeoutPenalty is the timeout value for a get offers request. _noOffersTimeoutPenalty = 1 * time.Second // _noTasksTimeoutPenalty is the timeout value for a get tasks request. _noTasksTimeoutPenalty = 1 * time.Second // error message for failed placed task _failedToPlaceTaskAfterTimeout = "failed to place task after timeout" ) // Engine represents a placement engine that can be started and stopped. type Engine interface { Start() Stop() } // New creates a new placement engine having one dedicated coordinator per task type. func New( parent tally.Scope, cfg *config.PlacementConfig, offerService offers.Service, taskService tasks.Service, hostsService hosts.Service, strategy plugins.Strategy, pool *async.Pool) Engine { scope := tally_metrics.NewMetrics( parent.SubScope(strings.ToLower(cfg.TaskType.String()))) engine := NewEngine( cfg, offerService, taskService, strategy, pool, scope, hostsService) return engine } // NewEngine creates a new placement engine. func NewEngine( config *config.PlacementConfig, offerService offers.Service, taskService tasks.Service, strategy plugins.Strategy, pool *async.Pool, scope *tally_metrics.Metrics, hostsService hosts.Service) Engine { result := &engine{ config: config, offerService: offerService, taskService: taskService, strategy: strategy, pool: pool, metrics: scope, } result.daemon = async.NewDaemon("Placement Engine", result) result.reserver = reserver.NewReserver(scope, config, hostsService, taskService) return result } type engine struct { config *config.PlacementConfig metrics *tally_metrics.Metrics pool *async.Pool offerService offers.Service taskService tasks.Service strategy plugins.Strategy daemon async.Daemon reserver reserver.Reserver } func (e *engine) Start() { e.daemon.Start() e.reserver.Start() e.metrics.Running.Update(1) } func (e *engine) Run(ctx context.Context) error { log.WithField("dequeue_period", e.config.TaskDequeuePeriod.String()). WithField("dequeue_timeout", e.config.TaskDequeueTimeOut). WithField("dequeue_limit", e.config.TaskDequeueLimit). WithField("no_task_delay", _noTasksTimeoutPenalty). Info("Engine started") var unfulfilledAssignment []models.Task var delay time.Duration timer := time.NewTimer(e.config.TaskDequeuePeriod) for { select { case <-ctx.Done(): if !timer.Stop() { <-timer.C } return ctx.Err() case <-timer.C: } unfulfilledAssignment, delay = e.Place(ctx, unfulfilledAssignment) log.WithField("delay", delay.String()).Debug("Placement delay") timer.Reset(delay) } } func (e *engine) Stop() { e.daemon.Stop() e.reserver.Stop() e.metrics.Running.Update(0) } // Place will let the coordinator do one placement round. // It accepts unfulfilled assignment from last round, and // try to process them in the current round. // Returns the slice of assignment that cannot be fulfilled and // the delay to start next round of placing. func (e *engine) Place( ctx context.Context, lastRoundAssignment []models.Task, ) ([]models.Task, time.Duration) { log.Debug("Beginning placement cycle") // Try and get some tasks/assignments dequeLimit := e.config.TaskDequeueLimit - len(lastRoundAssignment) assignments := e.taskService.Dequeue( ctx, e.config.TaskType, dequeLimit, e.config.TaskDequeueTimeOut) if len(assignments)+len(lastRoundAssignment) == 0 { return nil, _noTasksTimeoutPenalty } // process host reservation assignments err := e.reserver.ProcessHostReservation( ctx, assignments, ) if err != nil { log.WithError(err).Info("error in processing host reservations") } // add unfulfilledAssignment from last round and process // them in this round assignments = append(assignments, lastRoundAssignment...) // process revocable assignments unfulfilledAssignment := e.processAssignments( ctx, assignments, func(assignment models.Task) bool { return assignment.IsRevocable() && !assignment.IsReadyForHostReservation() }) // process non-revocable assignments unfulfilledAssignment = append( unfulfilledAssignment, e.processAssignments( ctx, assignments, func(assignment models.Task) bool { return !assignment.IsRevocable() && !assignment.IsReadyForHostReservation() })...) // TODO: Dynamically adjust this based on some signal return unfulfilledAssignment, e.config.TaskDequeuePeriod } // processAssignments processes assignments by creating correct host filters and // then finding host to place them on. // It returns assignments that cannot be fulfilled. func (e *engine) processAssignments( ctx context.Context, unassigned []models.Task, f func(models.Task) bool) []models.Task { assignments := []models.Task{} for _, assignment := range unassigned { if f(assignment) { assignments = append(assignments, assignment) } } if len(assignments) == 0 { return nil } unfulfilledAssignment := &concurrencySafeAssignmentSlice{} tasks := models.ToPluginTasks(assignments) tasksByNeeds := e.strategy.GroupTasksByPlacementNeeds(tasks) for i := range tasksByNeeds { group := tasksByNeeds[i] batch := []models.Task{} for _, idx := range group.Tasks { batch = append(batch, assignments[idx]) } e.pool.Enqueue(async.JobFunc(func(context.Context) { unfulfilled := e.placeAssignmentGroup(ctx, group.PlacementNeeds, batch) unfulfilledAssignment.append(unfulfilled...) })) } if !e.strategy.ConcurrencySafe() { // Wait for all batches to be processed e.pool.WaitUntilProcessed() return unfulfilledAssignment.get() } return unfulfilledAssignment.get() } // placeAssignmentGroup try to place the assignments, // and return a slice of assignments that cannot be fulfilled, // which need to be tried in the next round. func (e *engine) placeAssignmentGroup( ctx context.Context, needs plugins.PlacementNeeds, assignments []models.Task) []models.Task { for len(assignments) > 0 { log.WithFields(log.Fields{ "needs": needs, "len_assignments": len(assignments), "assignments": assignments, }).Debug("placing assignment group") // Get hosts with available resources and tasks currently running. offers, reason := e.offerService.Acquire( ctx, e.config.FetchOfferTasks, e.config.TaskType, needs) existing := e.findUsedHosts(assignments) now := time.Now() for !e.pastDeadline(now, assignments) && len(offers)+len(existing) == 0 { time.Sleep(_noOffersTimeoutPenalty) offers, reason = e.offerService.Acquire( ctx, e.config.FetchOfferTasks, e.config.TaskType, needs) now = time.Now() } // Add any offers still assigned to any task so the offers will eventually be returned or used in a placement. offers = append(offers, existing...) // We were starved for offers if len(offers) == 0 { log.WithFields(log.Fields{ "needs": needs, "assignments": assignments, }).Debug("failed to place tasks due to offer starvation") e.returnStarvedAssignments(ctx, assignments, reason) return nil } e.metrics.OfferGet.Inc(1) tasks := []plugins.Task{} for _, a := range assignments { tasks = append(tasks, a) } hosts := []plugins.Host{} for _, o := range offers { hosts = append(hosts, o) } // Delegate to the placement strategy to get the placements for these // tasks onto these offers. placements := e.strategy.GetTaskPlacements(tasks, hosts) for assignmentIdx, hostIdx := range placements { if hostIdx != -1 { assignments[assignmentIdx].SetPlacement(offers[hostIdx]) } } // Filter the assignments according to if they got assigned, // should be retried or were unassigned. assigned, retryable, unassigned := e.filterAssignments(time.Now(), assignments) // If number of hosts returned by host manager are more than number of tasks, // still tasks are unassigned then it reflects potential error on // affinity check at host manager. if len(unassigned) != 0 && len(tasks) <= len(hosts) { var hostnames, taskIDs []string for _, task := range tasks { taskIDs = append(taskIDs, task.PelotonID()) } for _, hostname := range hosts { hostnames = append(hostnames, hostname.ToMimirGroup().Name) } e.metrics.TaskAffinityFail.Inc(1) log.WithFields(log.Fields{ "hostnames": hostnames, "tasks": taskIDs, }).Info("Unassigned tasks even when more hosts available") } // We will retry the retryable tasks assignments = retryable log.WithFields(log.Fields{ "needs": needs, "assigned": assigned, "retryable": retryable, "unassigned": unassigned, "hosts": offers, }).Debug("Finshed one round placing assignment group") // Set placements and return unused offers and failed tasks e.cleanup(ctx, assigned, retryable, unassigned, offers) if len(retryable) != 0 && e.shouldPlaceRetryableInNextRun(retryable) { log.WithFields(log.Fields{ "retryable": retryable, }).Info("tasks are retried in the next run of placement") return retryable } } return nil } // returns if the retryable assignments should be retried in the run. // Otherwise they would continue to be processed in the processAssignments loop. func (e *engine) shouldPlaceRetryableInNextRun(retryable []models.Task) bool { // if a strategy is concurrency safe there is no need to process retryable assignments // in the next run, because placement engine would not be blocked by any of the retryable // assignments. if e.strategy.ConcurrencySafe() { return false } // If all retryable assignment are: // 1. tasks which cannot find a host to run on, or // 2. tasks which have a desired host, but is not placed on the desired host // handle the retryable tasks in the next round of placeAssignmentGroup, // because these tasks may need to wait for a long time for the hosts // to be available, and should not block other assignments to be placed. count := 0 for _, assignment := range retryable { if assignment.GetPlacement() == nil { count++ continue } if len(assignment.PreferredHost()) != 0 && assignment.PreferredHost() != assignment.GetPlacement().Hostname() { count++ continue } } if count == len(retryable) { return true } return false } // returns the starved assignments back to the task service func (e *engine) returnStarvedAssignments( ctx context.Context, failedAssignments []models.Task, reason string) { e.metrics.OfferStarved.Inc(1) // set the same reason for the failed assignments for _, a := range failedAssignments { a.SetPlacementFailure(reason) } e.taskService.SetPlacements(ctx, nil, failedAssignments) } // filters the assignments into three groups // 1. assigned : successful assignments. // 2. retryable: should be retried, either because we can find a // better host or we couldn't find a host. // 3. unassigned: tried enough times, we couldn't find a host. func (e *engine) filterAssignments( now time.Time, assignments []models.Task) ( assigned, retryable, unassigned []models.Task) { var pending []models.Task hostAssigned := make(map[string]struct{}) for _, assignment := range assignments { if assignment.GetPlacement() == nil { // we haven't found an assignment yet if assignment.IsPastDeadline(now) { // tried enough unassigned = append(unassigned, assignment) continue } } else { hostname := assignment.GetPlacement().Hostname() // found a host assignment.IncRounds() // lets check if we can find a better one if e.isAssignmentGoodEnough(assignment, now) { // tried enough, this match is good enough assigned = append(assigned, assignment) hostAssigned[hostname] = struct{}{} continue } } // If we come here we have either // 1) found a host but we can try and find a better match // 2) we haven't found a host but we have time to find another one pending = append(pending, assignment) } for _, assignment := range pending { if assignment.GetPlacement() == nil { retryable = append(retryable, assignment) } else if _, ok := hostAssigned[assignment.GetPlacement().Hostname()]; ok { if len(assignment.PreferredHost()) == 0 { // if the host is already used by one of the // assigned task, launch the task on that host. // Treating the assignment as retryable can be // problematic because the host would not be in PLACING // state, and if PE still decides to launch the task // on that host, an error would be returned assigned = append(assigned, assignment) } else { // for host not placed on the desired host, reset // host offers and add to retryable. Try to find // the desired host in the next rounds. assignment.SetPlacement(nil) retryable = append(retryable, assignment) } } else { retryable = append(retryable, assignment) } } return assigned, retryable, unassigned } // returns true if we have tried past max rounds or reached the deadline or // the host is already placed on the desired host. func (e *engine) isAssignmentGoodEnough( task models.Task, now time.Time, ) bool { if len(task.PreferredHost()) == 0 { return task.IsPastMaxRounds() || task.IsPastDeadline(now) } // if a task has desired host, it would try to get placed on // the desired host until it passes desired host placement deadline return task.PreferredHost() == task.GetPlacement().Hostname() || task.IsPastDeadline(now) } // findUsedHosts will find the hosts that are used by the retryable assignments. func (e *engine) findUsedHosts( retryable []models.Task) []models.Offer { var used []models.Offer for _, assignment := range retryable { if offer := assignment.GetPlacement(); offer != nil { used = append(used, offer) } } return used } // findUnusedHosts will find the hosts that are unused by the assigned and retryable assignments. func (e *engine) findUnusedHosts( assigned, retryable []models.Task, hosts []models.Offer) []models.Offer { assignments := make([]models.Task, 0, len(assigned)+len(retryable)) assignments = append(assignments, assigned...) assignments = append(assignments, retryable...) // For each offer determine if any tasks where assigned to it. usedOffers := map[string]struct{}{} for _, placement := range assignments { offer := placement.GetPlacement() if offer == nil { continue } usedOffers[offer.ID()] = struct{}{} } // Find the unused hosts unusedOffers := []models.Offer{} for _, offer := range hosts { if _, used := usedOffers[offer.ID()]; !used { unusedOffers = append(unusedOffers, offer) } } return unusedOffers } func (e *engine) cleanup( ctx context.Context, assigned, retryable, unassigned []models.Task, offers []models.Offer) { // Create the resource manager placements. e.taskService.SetPlacements( ctx, assigned, unassigned, ) // Find the unused offers. unusedOffers := e.findUnusedHosts(assigned, retryable, offers) if len(unusedOffers) > 0 { // Release the unused offers. e.offerService.Release(ctx, unusedOffers) } } func (e *engine) pastDeadline(now time.Time, assignments []models.Task) bool { for _, assignment := range assignments { if !assignment.IsPastDeadline(now) { return false } } return true } type concurrencySafeAssignmentSlice struct { sync.RWMutex slice []models.Task } func (s *concurrencySafeAssignmentSlice) append(a ...models.Task) { s.Lock() s.slice = append(s.slice, a...) s.Unlock() } func (s *concurrencySafeAssignmentSlice) get() []models.Task { s.RLock() defer s.RUnlock() return s.slice }