pkg/resmgr/task/tracker.go (437 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package task import ( "sync" mesos "github.com/uber/peloton/.gen/mesos/v1" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" "github.com/uber/peloton/.gen/peloton/api/v0/task" "github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc" "github.com/uber/peloton/.gen/peloton/private/resmgr" "github.com/uber/peloton/pkg/common/eventstream" "github.com/uber/peloton/pkg/common/util" "github.com/uber/peloton/pkg/resmgr/respool" "github.com/uber/peloton/pkg/resmgr/scalar" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" ) // Tracker is the interface for resource manager to // track all the tasks // TODO: Get rid of peloton-task-id from tracker type Tracker interface { // AddTask adds the task to state machine AddTask( t *resmgr.Task, handler *eventstream.Handler, respool respool.ResPool, config *Config) error // GetTask gets the RM task for taskID GetTask(t *peloton.TaskID) *RMTask // SetPlacementHost Sets the placement for the tasks. SetPlacement(placement *resmgr.Placement) // DeleteTask deletes the task from the map DeleteTask(t *peloton.TaskID) // MarkItDone marks the task done and add back those // resources to respool MarkItDone(mesosTaskID string) error // MarkItInvalid marks the task done and invalidate them // in to respool by that they can be removed from the queue MarkItInvalid(mesosTaskID string) error // TasksByHosts returns all tasks of the given type running on the given hosts. TasksByHosts(hosts []string, taskType resmgr.TaskType) map[string][]*RMTask // AddResources adds the task resources to respool AddResources(taskID *peloton.TaskID) error // GetSize returns the number of the tasks in tracker GetSize() int64 // Clear cleans the tracker with all the tasks Clear() // GetActiveTasks returns task states map GetActiveTasks(jobID string, respoolID string, states []string) map[string][]*RMTask // UpdateMetrics updates the task metrics UpdateMetrics(from task.TaskState, to task.TaskState, taskResources *scalar.Resources) // GetOrphanTask gets the orphan RMTask for the given mesos-task-id GetOrphanTask(mesosTaskID string) *RMTask // GetOrphanTasks returns orphan tasks GetOrphanTasks(respoolID string) []*RMTask } // tracker is the rmtask tracker // map[taskid]*rmtask // TODO: Get rid of peloton-task-id from tracker type tracker struct { lock sync.RWMutex // Map of peloton task ID to the resource manager task tasks map[string]*RMTask // Maps hostname -> task type -> mesos task id -> rm task placements map[string]map[resmgr.TaskType]map[string]*RMTask // Map of mesos task ID to rm task // Orphan tasks are those whose resources are not released but // are no longer tracked by the tracker (since the RMTask in tracker // was replaced by a RMTask with new mesos task id). When we receive a // terminal event for a task that is not present in the tracker, // we use this map to release held resources, if any. // TODO: Move `placements` and `orphanTasks` out of tracker orphanTasks map[string]*RMTask scope tally.Scope metrics *Metrics // mutex for the task state metrics metricsLock sync.Mutex // map of task state to the count of tasks in the tracker counters map[task.TaskState]float64 // map of task state to the resources held resourcesHeldByTaskState map[task.TaskState]*scalar.Resources // host manager client hostMgrClient hostsvc.InternalHostServiceYARPCClient } // singleton object var rmtracker *tracker // InitTaskTracker initialize the task tracker func InitTaskTracker( parent tally.Scope, config *Config) { if rmtracker != nil { log.Info("Resource Manager Tracker is already initialized") return } scope := parent.SubScope("tracker") rmtracker = &tracker{ tasks: make(map[string]*RMTask), placements: map[string]map[resmgr.TaskType]map[string]*RMTask{}, orphanTasks: make(map[string]*RMTask), metrics: NewMetrics(scope), scope: scope, counters: make(map[task.TaskState]float64), resourcesHeldByTaskState: make(map[task.TaskState]*scalar.Resources), } // Initialize resources held by each non-terminal task state to zero resource. for s := range task.TaskState_name { taskState := task.TaskState(s) // skip terminal states if !util.IsPelotonStateTerminal(taskState) { rmtracker.resourcesHeldByTaskState[taskState] = scalar.ZeroResource } } // Checking placement back off is enabled , if yes then initialize // policy factory. Explicitly checking, anything related to // back off policies should come inside this code path. if config.EnablePlacementBackoff { err := InitPolicyFactory() if err != nil { log.Error("Error initializing back off policy") } } log.Info("Resource Manager Tracker is initialized") } // GetTracker gets the singleton object of the tracker func GetTracker() Tracker { if rmtracker == nil { log.Fatal("Tracker is not initialized") } return rmtracker } // AddTask adds task to resmgr task tracker func (tr *tracker) AddTask( t *resmgr.Task, handler *eventstream.Handler, respool respool.ResPool, config *Config, ) error { rmTask, err := CreateRMTask( tr.scope.SubScope("rmtask"), t, handler, respool, config) if err != nil { return err } tr.lock.Lock() defer tr.lock.Unlock() if prevRMTask, ok := tr.tasks[rmTask.task.GetId().GetValue()]; ok && prevRMTask.task.GetTaskId().GetValue() != rmTask.task.GetTaskId().GetValue() { // If EnqueueGangs request for a new run of the task is received // before the terminal event for the last run is processed by resmgr // mark the prev RMTask as an orphan task so that we can release // resources when the terminal event is processed tr.orphanTasks[prevRMTask.task.GetTaskId().GetValue()] = prevRMTask tr.metrics.OrphanTasks.Update(float64(len(tr.orphanTasks))) } tr.tasks[rmTask.task.GetId().GetValue()] = rmTask if rmTask.task.Hostname != "" { tr.setPlacement(rmTask.task.GetTaskId(), rmTask.task.GetHostname()) } tr.metrics.TasksCountInTracker.Update(float64(tr.GetSize())) return nil } // GetTask gets the RM task for taskID // this locks the tracker and get the Task func (tr *tracker) GetTask(t *peloton.TaskID) *RMTask { tr.lock.RLock() defer tr.lock.RUnlock() return tr.getTask(t) } // getTask gets the RM task for taskID // this method is not protected, we need to lock tracker // before we use this func (tr *tracker) getTask(t *peloton.TaskID) *RMTask { if rmTask, ok := tr.tasks[t.Value]; ok { return rmTask } return nil } // setPlacement writes the host:task mapping for the given hostname and mesos-task-id // in the placements map of the tracker. Before writing to placements map it checks // if the task is present in the tracker. If not present, the mapping is not set func (tr *tracker) setPlacement(t *mesos.TaskID, hostname string) { taskID, err := util.ParseTaskIDFromMesosTaskID(t.GetValue()) if err != nil { log.WithError(err). Error("error while setting placement") return } rmTask, ok := tr.tasks[taskID] if !ok { return } rmTask.task.Hostname = hostname if _, exists := tr.placements[hostname]; !exists { tr.placements[hostname] = map[resmgr.TaskType]map[string]*RMTask{} } if _, exists := tr.placements[hostname][rmTask.task.GetType()]; !exists { tr.placements[hostname][rmTask.task.GetType()] = map[string]*RMTask{} } if _, exists := tr.placements[hostname][rmTask.task.GetType()][t.GetValue()]; !exists { tr.placements[hostname][rmTask.task.GetType()][t.GetValue()] = rmTask } } // clearPlacement will remove the task from the placements map. func (tr *tracker) clearPlacement( hostname string, taskType resmgr.TaskType, mesosTaskID string, ) { if hostname == "" { return } placements := tr.placements[hostname] delete(placements[taskType], mesosTaskID) if len(placements[taskType]) == 0 { delete(placements, taskType) } if len(tr.placements[hostname]) == 0 { delete(tr.placements, hostname) } } // SetPlacementHost will set the hostname that the task is currently placed on. func (tr *tracker) SetPlacement(placement *resmgr.Placement) { tr.lock.Lock() defer tr.lock.Unlock() for _, t := range placement.GetTaskIDs() { tr.setPlacement(t.GetMesosTaskID(), placement.GetHostname()) } } // DeleteTask deletes the task from the map after // locking the tracker , this is interface call func (tr *tracker) DeleteTask(t *peloton.TaskID) { tr.lock.Lock() defer tr.lock.Unlock() tr.deleteTask(t) } // deleteTask deletes the task from the map // this method is not protected, we need to lock tracker // before we use this. func (tr *tracker) deleteTask(t *peloton.TaskID) { rmTask, exists := tr.tasks[t.Value] if !exists { return } tr.clearPlacement( rmTask.task.GetHostname(), rmTask.task.GetType(), rmTask.task.GetTaskId().GetValue(), ) delete(tr.tasks, t.Value) tr.metrics.TasksCountInTracker.Update(float64(tr.GetSize())) } // MarkItDone updates the resources in resmgr and removes the task // from the tracker func (tr *tracker) MarkItDone( mesosTaskID string) error { tr.lock.Lock() defer tr.lock.Unlock() return tr.markItDone(mesosTaskID) } // MarkItInvalid marks the task done and invalidate them // in to respool by that they can be removed from the queue func (tr *tracker) MarkItInvalid(mesosTaskID string) error { tr.lock.Lock() defer tr.lock.Unlock() taskID, err := util.ParseTaskIDFromMesosTaskID(mesosTaskID) if err != nil { return err } tID := &peloton.TaskID{Value: taskID} t := tr.getTask(tID) // remove from the tracker if err = tr.markItDone(mesosTaskID); err != nil { return err } if t == nil { return nil } switch t.GetCurrentState().State { case task.TaskState_PENDING, task.TaskState_INITIALIZED: // If task is in INITIALIZED or PENDING state we need to invalidate // it from in pending queue t.respool.AddInvalidTask(tID) case task.TaskState_READY: // If task is in READY state we need to invalidate // it from in ready queue GetScheduler().AddInvalidTask(tID) } return nil } // tracker needs to be locked before calling this. func (tr *tracker) markItDone(mesosTaskID string) error { taskID, err := util.ParseTaskIDFromMesosTaskID(mesosTaskID) if err != nil { return err } tID := &peloton.TaskID{Value: taskID} t := tr.getTask(tID) if t == nil || t.Task().GetTaskId().GetValue() != mesosTaskID { // If task is not in tracker or if the mesos ID has changed, clear the // placement and free up the held resources. // 1. `task not in tracker` - This can happen if resource manager receives // the task event for an older run after the latest run of the task // has been cleaned up from tracker. // 2. `mesos ID has changed` - This can happen when jobmgr processes the // mesos event faster than resmgr causing EnqueueGangs to be called // before the task termination event is processed by resmgr. return tr.deleteOrphanTask(mesosTaskID) } // We need to skip the tasks from resource counting which are in pending and // and initialized state if !(t.GetCurrentState().State == task.TaskState_PENDING || t.GetCurrentState().State == task.TaskState_INITIALIZED) { err := t.respool.SubtractFromAllocation(scalar.GetTaskAllocation(t.Task())) if err != nil { return errors.Errorf("failed update task %s ", taskID) } tr.metricsLock.Lock() defer tr.metricsLock.Unlock() // update metrics taskState := t.GetCurrentState().State if val, ok := tr.resourcesHeldByTaskState[taskState]; ok { tr.resourcesHeldByTaskState[taskState] = val.Subtract( scalar.ConvertToResmgrResource(t.task.GetResource()), ) } // publish metrics if gauge, ok := tr.metrics.ResourcesHeldByTaskState[taskState]; ok { gauge.Update(tr.resourcesHeldByTaskState[taskState]) } } // terminate the rm task t.Terminate() log.WithField("task_id", t.Task().GetTaskId().GetValue()). Info("Deleting the task from Tracker") tr.deleteTask(tID) return nil } // TasksByHosts returns all tasks of the given type running on the given hosts. func (tr *tracker) TasksByHosts(hosts []string, taskType resmgr.TaskType) map[string][]*RMTask { tr.lock.RLock() defer tr.lock.RUnlock() result := map[string][]*RMTask{} var types []resmgr.TaskType if taskType == resmgr.TaskType_UNKNOWN { for t := range resmgr.TaskType_name { types = append(types, resmgr.TaskType(t)) } } else { types = append(types, taskType) } for _, hostname := range hosts { for _, tType := range types { for _, rmTask := range tr.placements[hostname][tType] { result[hostname] = append(result[hostname], rmTask) } } } return result } // AddResources adds the task resources to respool func (tr *tracker) AddResources( tID *peloton.TaskID) error { rmTask := tr.GetTask(tID) if rmTask == nil { return errors.Errorf("rmTask %s is not in tracker", tID) } res := scalar.ConvertToResmgrResource(rmTask.Task().GetResource()) err := rmTask.respool.AddToAllocation(scalar.GetTaskAllocation(rmTask.Task())) if err != nil { return errors.Errorf("Not able to add resources for "+ "rmTask %s for respool %s ", tID, rmTask.respool.Name()) } tr.metricsLock.Lock() defer tr.metricsLock.Unlock() taskState := rmTask.GetCurrentState().State if val, ok := tr.resourcesHeldByTaskState[taskState]; ok { tr.resourcesHeldByTaskState[taskState] = val.Add(res) } // publish metrics if gauge, ok := tr.metrics.ResourcesHeldByTaskState[taskState]; ok { gauge.Update(tr.resourcesHeldByTaskState[taskState]) } log.WithFields(log.Fields{ "respool_id": rmTask.respool.ID(), "resources": res, }).Debug("Added resources to Respool") return nil } // GetSize gets the number of tasks in tracker func (tr *tracker) GetSize() int64 { return int64(len(tr.tasks)) } // Clear cleans the tracker with all the existing tasks func (tr *tracker) Clear() { tr.lock.Lock() defer tr.lock.Unlock() // Clearing the tasks for k := range tr.tasks { delete(tr.tasks, k) } // Clearing the placements for k := range tr.placements { delete(tr.placements, k) } // Clearing the orphan tasks for k := range tr.orphanTasks { delete(tr.orphanTasks, k) } // publish metrics tr.metrics.OrphanTasks.Update(float64(len(tr.orphanTasks))) } // GetActiveTasks returns task to states map, if jobID or respoolID is provided, // only tasks for that job or respool will be returned func (tr *tracker) GetActiveTasks( jobID string, respoolID string, states []string) map[string][]*RMTask { tr.lock.RLock() defer tr.lock.RUnlock() taskStates := make(map[string][]*RMTask) for _, t := range filterTasks(tr.tasks, jobID, respoolID, states) { taskState := t.GetCurrentState().State.String() taskStates[taskState] = append(taskStates[taskState], t) } for _, t := range filterTasks(tr.orphanTasks, jobID, respoolID, states) { taskState := t.GetCurrentState().State.String() taskStates[taskState] = append(taskStates[taskState], t) } return taskStates } // UpdateMetrics updates the task metrics. This can be called from // multiple goroutines. func (tr *tracker) UpdateMetrics( from task.TaskState, to task.TaskState, taskResources *scalar.Resources, ) { tr.metricsLock.Lock() defer tr.metricsLock.Unlock() // Reducing the count from state tr.counters[from]-- if tr.counters[from] < 0 { tr.counters[from] = 0 } // Incrementing the state counter to +1 tr.counters[to]++ // Subtract resources from 'from' state if res, ok := tr.resourcesHeldByTaskState[from]; ok { tr.resourcesHeldByTaskState[from] = res.Subtract(taskResources) } // Add resources to 'to' state if res, ok := tr.resourcesHeldByTaskState[to]; ok { tr.resourcesHeldByTaskState[to] = res.Add(taskResources) } // publish metrics if gauge, ok := tr.metrics.TaskStatesGauge[from]; ok { gauge.Update(tr.counters[from]) } if gauge, ok := tr.metrics.TaskStatesGauge[to]; ok { gauge.Update(tr.counters[to]) } if gauge, ok := tr.metrics.ResourcesHeldByTaskState[from]; ok { gauge.Update(tr.resourcesHeldByTaskState[from]) } if gauge, ok := tr.metrics.ResourcesHeldByTaskState[to]; ok { gauge.Update(tr.resourcesHeldByTaskState[to]) } } // GetOrphanTask gets the orphan RMTask for the given mesos-task-id func (tr *tracker) GetOrphanTask(mesosTaskID string) *RMTask { tr.lock.RLock() defer tr.lock.RUnlock() if rmTask, ok := tr.orphanTasks[mesosTaskID]; ok { return rmTask } return nil } // GetOrphanTasks returns all orphan tasks known to resource manager func (tr *tracker) GetOrphanTasks(respoolID string) []*RMTask { tr.lock.RLock() defer tr.lock.RUnlock() return filterTasks(tr.orphanTasks, "", respoolID, nil) } // deleteOrphanTask is a helper that cleans up the task from the // host-to-tasks map and releases the resources held by the task func (tr *tracker) deleteOrphanTask(mesosTaskID string) error { rmTask, ok := tr.orphanTasks[mesosTaskID] if !ok { // If the mesos task ID is not a known orphan task then // it means there are no resources held for this task. // We can simply return here return nil } tr.clearPlacement( rmTask.task.GetHostname(), rmTask.task.GetType(), mesosTaskID, ) err := rmTask.respool.SubtractFromAllocation(scalar.GetTaskAllocation(rmTask.task)) if err != nil { log.WithField("mesos_task", mesosTaskID). WithField("resources", rmTask.task.GetResource()). WithError(err). Error("failed to release held resources for task") err = errors.Wrapf(err, "failed to release held resources for task %s", mesosTaskID) } delete(tr.orphanTasks, mesosTaskID) // update metrics tr.metricsLock.Lock() defer tr.metricsLock.Unlock() taskState := rmTask.GetCurrentState().State if val, ok := tr.resourcesHeldByTaskState[taskState]; ok { tr.resourcesHeldByTaskState[taskState] = val.Subtract( scalar.ConvertToResmgrResource(rmTask.task.GetResource()), ) } // publish metrics if gauge, ok := tr.metrics.ResourcesHeldByTaskState[taskState]; ok { gauge.Update(tr.resourcesHeldByTaskState[taskState]) } log.WithFields(log.Fields{ "orphan_task": mesosTaskID, }).Debug("Orphan task deleted") tr.metrics.OrphanTasks.Update(float64(len(tr.orphanTasks))) return err } // filterTasks filters the tasks based on the jobID, respoolID and states filters func filterTasks( tasks map[string]*RMTask, jobID string, respoolID string, states []string, ) []*RMTask { var result []*RMTask for _, t := range tasks { // filter by jobID if jobID != "" && t.Task().GetJobId().GetValue() != jobID { continue } // filter by resource pool ID if respoolID != "" && t.Respool().ID() != respoolID { continue } // filter by task states if len(states) > 0 && !util.Contains(states, t.GetCurrentState().State.String()) { continue } result = append(result, t) } return result }