pkg/resmgr/handler.go (927 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package resmgr
import (
"context"
"fmt"
"reflect"
"sync/atomic"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
t "github.com/uber/peloton/.gen/peloton/api/v0/task"
pb_eventstream "github.com/uber/peloton/.gen/peloton/private/eventstream"
"github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc"
"github.com/uber/peloton/.gen/peloton/private/resmgr"
"github.com/uber/peloton/.gen/peloton/private/resmgrsvc"
"github.com/uber/peloton/pkg/resmgr/hostmover"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/eventstream"
"github.com/uber/peloton/pkg/common/queue"
"github.com/uber/peloton/pkg/common/statemachine"
"github.com/uber/peloton/pkg/common/util"
"github.com/uber/peloton/pkg/resmgr/preemption"
r_queue "github.com/uber/peloton/pkg/resmgr/queue"
"github.com/uber/peloton/pkg/resmgr/respool"
"github.com/uber/peloton/pkg/resmgr/scalar"
rmtask "github.com/uber/peloton/pkg/resmgr/task"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
"go.uber.org/yarpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
errFailingGangMemberTask = errors.New("task fail because other gang member failed")
errSameTaskPresent = errors.New("same task present in tracker, Ignoring new task")
errGangNotEnqueued = errors.New("could not enqueue gang to ready after retry")
errEnqueuedAgain = errors.New("enqueued again after retry")
errRequeueTaskFailed = errors.New("requeue existing task to resmgr failed")
)
const (
_reasonPlacementReceived = "placement received"
_reasonDequeuedForLaunch = "placement dequeued, waiting for launch"
)
const _eventStreamBufferSize = 1000
// ServiceHandler implements peloton.private.resmgr.ResourceManagerService
type ServiceHandler struct {
// the handler config
config Config
// metrics and scope
scope tally.Scope
metrics *Metrics
// queues for preempting and placing tasks
preemptionQueue preemption.Queue
placements queue.Queue
// handler for host manager event stream
maxOffset *uint64
eventStreamHandler *eventstream.Handler
// rmtasks tracker
rmTracker rmtask.Tracker
// batch host scorer
batchScorer hostmover.Scorer
// in-memory resource pool tree
resPoolTree respool.Tree
hostmgrClient hostsvc.InternalHostServiceYARPCClient
}
// NewServiceHandler initializes the handler for ResourceManagerService
func NewServiceHandler(
d *yarpc.Dispatcher,
parent tally.Scope,
rmTracker rmtask.Tracker,
batchScorer hostmover.Scorer,
tree respool.Tree,
preemptionQueue preemption.Queue,
hostmgrClient hostsvc.InternalHostServiceYARPCClient,
conf Config) *ServiceHandler {
var maxOffset uint64
handler := &ServiceHandler{
metrics: NewMetrics(parent.SubScope("resmgr")),
resPoolTree: tree,
placements: queue.NewQueue(
"placement-queue",
reflect.TypeOf(resmgr.Placement{}),
maxPlacementQueueSize,
),
rmTracker: rmTracker,
batchScorer: batchScorer,
preemptionQueue: preemptionQueue,
maxOffset: &maxOffset,
config: conf,
scope: parent,
eventStreamHandler: initEventStreamHandler(
d,
_eventStreamBufferSize,
parent.SubScope("resmgr")),
hostmgrClient: hostmgrClient,
}
d.Register(resmgrsvc.BuildResourceManagerServiceYARPCProcedures(handler))
return handler
}
func initEventStreamHandler(d *yarpc.Dispatcher, bufferSize int, parentScope tally.Scope) *eventstream.Handler {
eventStreamHandler := eventstream.NewEventStreamHandler(
bufferSize,
[]string{
common.PelotonJobManager,
common.PelotonResourceManager,
},
nil,
parentScope)
d.Register(pb_eventstream.BuildEventStreamServiceYARPCProcedures(eventStreamHandler))
return eventStreamHandler
}
// GetStreamHandler returns the stream handler
func (h *ServiceHandler) GetStreamHandler() *eventstream.Handler {
return h.eventStreamHandler
}
// EnqueueGangs implements ResourceManagerService.EnqueueGangs
func (h *ServiceHandler) EnqueueGangs(
ctx context.Context,
req *resmgrsvc.EnqueueGangsRequest,
) (*resmgrsvc.EnqueueGangsResponse, error) {
log.WithField("request", req).Info("EnqueueGangs called.")
h.metrics.APIEnqueueGangs.Inc(1)
var err error
var resourcePool respool.ResPool
respoolID := req.GetResPool()
if respoolID == nil {
return &resmgrsvc.EnqueueGangsResponse{
Error: &resmgrsvc.EnqueueGangsResponse_Error{
NotFound: &resmgrsvc.ResourcePoolNotFound{
Id: respoolID,
Message: "resource pool ID can't be nil",
},
},
}, nil
}
// Lookup respool from the resource pool tree
resourcePool, err = h.resPoolTree.Get(respoolID)
if err != nil {
h.metrics.EnqueueGangFail.Inc(1)
return &resmgrsvc.EnqueueGangsResponse{
Error: &resmgrsvc.EnqueueGangsResponse_Error{
NotFound: &resmgrsvc.ResourcePoolNotFound{
Id: respoolID,
Message: err.Error(),
},
},
}, nil
}
var failedGangs []*resmgrsvc.EnqueueGangsFailure_FailedTask
// Enqueue the gangs sent in an API call to the pending queue of the respool.
// For each gang, add its tasks to the state machine, enqueue the gang, and
// return per-task success/failure.
for _, gang := range req.GetGangs() {
failedGang, err := h.enqueueGang(gang, resourcePool)
if err != nil {
failedGangs = append(failedGangs, failedGang...)
h.metrics.EnqueueGangFail.Inc(1)
continue
}
h.metrics.EnqueueGangSuccess.Inc(1)
}
// Even if one gang fails we return as error.
if len(failedGangs) > 0 {
return &resmgrsvc.EnqueueGangsResponse{
Error: &resmgrsvc.EnqueueGangsResponse_Error{
Failure: &resmgrsvc.EnqueueGangsFailure{
Failed: failedGangs,
},
},
}, nil
}
log.Debug("Enqueue Returned")
return &resmgrsvc.EnqueueGangsResponse{}, nil
}
// enqueueGang adds the new gangs to pending queue or
// requeue the gang if the tasks have different mesos
// taskid.
func (h *ServiceHandler) enqueueGang(
gang *resmgrsvc.Gang,
respool respool.ResPool) (
[]*resmgrsvc.EnqueueGangsFailure_FailedTask,
error) {
var failed []*resmgrsvc.EnqueueGangsFailure_FailedTask
var failedTask *resmgrsvc.EnqueueGangsFailure_FailedTask
var err error
failedTasks := make(map[string]bool)
for _, task := range gang.GetTasks() {
if !(h.isTaskPresent(task)) {
// If the task is not present in the tracker
// this means its a new task and needs to be
// added to tracker
failedTask, err = h.addTask(task, respool)
} else {
// This is the already present task,
// We need to check if it has same mesos
// id or different mesos task id.
failedTask, err = h.requeueTask(task, respool)
}
// If there is any failure we need to add those tasks to
// failed list of task by that we can remove the gang later
if err != nil {
failed = append(failed, failedTask)
failedTasks[task.Id.Value] = true
}
}
if len(failed) == 0 {
err = h.addingGangToPendingQueue(gang, respool)
// if there is error , we need to mark all tasks in gang failed.
if err != nil {
failed = append(failed, h.markingTasksFailInGang(gang, failedTasks, errFailingGangMemberTask)...)
err = errGangNotEnqueued
}
return failed, err
}
// we need to fail the other tasks which are not failed
// as we have to enqueue whole gang or not
// here we are assuming that all the tasks in gang whether
// be enqueued or requeued.
failed = append(failed, h.markingTasksFailInGang(gang, failedTasks, errFailingGangMemberTask)...)
err = errGangNotEnqueued
return failed, err
}
// isTaskPresent checks if the task is present in the tracker, Returns
// True if present otherwise False
func (h *ServiceHandler) isTaskPresent(requeuedTask *resmgr.Task) bool {
return h.rmTracker.GetTask(requeuedTask.Id) != nil
}
// removeGangFromTracker removes the task from the tracker
func (h *ServiceHandler) removeGangFromTracker(gang *resmgrsvc.Gang) {
for _, task := range gang.Tasks {
h.rmTracker.DeleteTask(task.Id)
}
}
// addingGangToPendingQueue transit all tasks of gang to PENDING state
// and add them to pending queue by that they can be scheduled for
// next scheduling cycle
func (h *ServiceHandler) addingGangToPendingQueue(
gang *resmgrsvc.Gang,
respool respool.ResPool) error {
for _, task := range gang.GetTasks() {
if h.rmTracker.GetTask(task.Id) != nil {
// transiting the task from INITIALIZED State to PENDING State
err := h.rmTracker.GetTask(task.Id).TransitTo(
t.TaskState_PENDING.String(), statemachine.WithReason("enqueue gangs called"),
statemachine.WithInfo(mesosTaskID, *task.GetTaskId().Value))
if err != nil {
log.WithError(err).WithField("task", task.Id.Value).
Error("not able to transit task to PENDING")
// Removing the gang from the tracker if some tasks fail to transit
h.removeGangFromTracker(gang)
return errGangNotEnqueued
}
}
}
// Adding gang to pending queue
if err := respool.EnqueueGang(gang); err != nil {
// We need to remove gang tasks from tracker
h.removeGangFromTracker(gang)
return errGangNotEnqueued
}
return nil
}
// markingTasksFailInGang marks all other tasks as failed which are not
// part of failedTasks map and returns the failed list.
func (h *ServiceHandler) markingTasksFailInGang(gang *resmgrsvc.Gang,
failedTasks map[string]bool,
err error,
) []*resmgrsvc.EnqueueGangsFailure_FailedTask {
var failed []*resmgrsvc.EnqueueGangsFailure_FailedTask
for _, task := range gang.GetTasks() {
if _, ok := failedTasks[task.Id.Value]; !ok {
failed = append(failed,
&resmgrsvc.EnqueueGangsFailure_FailedTask{
Task: task,
Message: err.Error(),
Errorcode: resmgrsvc.EnqueueGangsFailure_ENQUEUE_GANGS_FAILURE_ERROR_CODE_FAILED_DUE_TO_GANG_FAILED,
})
}
}
return failed
}
// addTask adds the task to RMTracker based on the respool
func (h *ServiceHandler) addTask(newTask *resmgr.Task, respool respool.ResPool,
) (*resmgrsvc.EnqueueGangsFailure_FailedTask, error) {
// Adding task to state machine
err := h.rmTracker.AddTask(
newTask,
h.eventStreamHandler,
respool,
h.config.RmTaskConfig,
)
if err != nil {
return &resmgrsvc.EnqueueGangsFailure_FailedTask{
Task: newTask,
Message: err.Error(),
Errorcode: resmgrsvc.EnqueueGangsFailure_ENQUEUE_GANGS_FAILURE_ERROR_CODE_INTERNAL,
}, err
}
return nil, nil
}
// requeueTask validates the enqueued task has the same mesos task id or not
// If task has same mesos task id => return error
// If task has different mesos task id then check state and based on the state
// act accordingly
func (h *ServiceHandler) requeueTask(
requeuedTask *resmgr.Task,
respool respool.ResPool,
) (*resmgrsvc.EnqueueGangsFailure_FailedTask, error) {
rmTask := h.rmTracker.GetTask(requeuedTask.GetId())
if rmTask == nil {
return &resmgrsvc.EnqueueGangsFailure_FailedTask{
Task: requeuedTask,
Message: errRequeueTaskFailed.Error(),
Errorcode: resmgrsvc.EnqueueGangsFailure_ENQUEUE_GANGS_FAILURE_ERROR_CODE_INTERNAL,
}, errRequeueTaskFailed
}
if *requeuedTask.TaskId.Value == *rmTask.Task().TaskId.Value {
return &resmgrsvc.EnqueueGangsFailure_FailedTask{
Task: requeuedTask,
Message: errSameTaskPresent.Error(),
Errorcode: resmgrsvc.EnqueueGangsFailure_ENQUEUE_GANGS_FAILURE_ERROR_CODE_ALREADY_EXIST,
}, errSameTaskPresent
}
currentTaskState := rmTask.GetCurrentState().State
// If state is Launched or Running
// replace the task in the tracker and requeue
if h.isTaskInTransitRunning(currentTaskState) {
return h.addTask(requeuedTask, respool)
}
// TASK should not be in any other state other than RUNNING or LAUNCHED
log.WithFields(log.Fields{
"task": rmTask.Task().Id.Value,
"current_state": currentTaskState.String(),
"old_mesos_task_id": *rmTask.Task().TaskId.Value,
"new_mesos_task_id": *requeuedTask.TaskId.Value,
}).Error("task should not be requeued with different mesos taskid at this state")
return &resmgrsvc.EnqueueGangsFailure_FailedTask{
Task: requeuedTask,
Message: errSameTaskPresent.Error(),
Errorcode: resmgrsvc.EnqueueGangsFailure_ENQUEUE_GANGS_FAILURE_ERROR_CODE_INTERNAL,
}, errRequeueTaskFailed
}
// isTaskInTransitRunning return TRUE if the task state is in
// RUNNING or LAUNCHED state else it returns FALSE
func (h *ServiceHandler) isTaskInTransitRunning(state t.TaskState) bool {
if state == t.TaskState_LAUNCHED ||
state == t.TaskState_RUNNING {
return true
}
return false
}
// DequeueGangs implements ResourceManagerService.DequeueGangs
func (h *ServiceHandler) DequeueGangs(
ctx context.Context,
req *resmgrsvc.DequeueGangsRequest,
) (*resmgrsvc.DequeueGangsResponse, error) {
h.metrics.APIDequeueGangs.Inc(1)
limit := req.GetLimit()
timeout := time.Duration(req.GetTimeout())
sched := rmtask.GetScheduler()
var gangs []*resmgrsvc.Gang
for i := uint32(0); i < limit; i++ {
gang, err := sched.DequeueGang(timeout*time.Millisecond, req.Type)
if err != nil {
log.WithField("task_type", req.Type).
Debug("Timeout to dequeue gang from ready queue")
h.metrics.DequeueGangTimeout.Inc(1)
break
}
tasksToRemove := make(map[string]*resmgr.Task)
for _, task := range gang.GetTasks() {
h.metrics.DequeueGangSuccess.Inc(1)
// Moving task to Placing state or Reserved state
if h.rmTracker.GetTask(task.Id) != nil {
if task.ReadyForHostReservation {
err = h.rmTracker.GetTask(task.Id).TransitTo(
t.TaskState_RESERVED.String())
} else {
// Checking if placement backoff is enabled if yes add the
// backoff otherwise just do the transition
if h.config.RmTaskConfig.EnablePlacementBackoff {
//Adding backoff
h.rmTracker.GetTask(task.Id).AddBackoff()
}
err = h.rmTracker.GetTask(task.Id).TransitTo(
t.TaskState_PLACING.String())
}
if err != nil {
log.WithError(err).WithField(
"task_id", task.Id.Value).
Error("Failed to transit state " +
"for task")
}
} else {
tasksToRemove[task.Id.Value] = task
}
}
gang = h.removeFromGang(gang, tasksToRemove)
gangs = append(gangs, gang)
}
// TODO: handle the dequeue errors better
response := resmgrsvc.DequeueGangsResponse{Gangs: gangs}
log.WithField("response", response).Debug("DequeueGangs succeeded")
return &response, nil
}
func (h *ServiceHandler) removeFromGang(
gang *resmgrsvc.Gang,
tasksToRemove map[string]*resmgr.Task) *resmgrsvc.Gang {
if len(tasksToRemove) == 0 {
return gang
}
var newTasks []*resmgr.Task
for _, gt := range gang.GetTasks() {
if _, ok := tasksToRemove[gt.Id.Value]; !ok {
newTasks = append(newTasks, gt)
}
}
gang.Tasks = newTasks
return gang
}
// SetPlacements implements ResourceManagerService.SetPlacements
func (h *ServiceHandler) SetPlacements(
ctx context.Context,
req *resmgrsvc.SetPlacementsRequest,
) (*resmgrsvc.SetPlacementsResponse, error) {
log.WithField("request", req).Debug("SetPlacements called.")
h.metrics.APISetPlacements.Inc(1)
var failed []*resmgrsvc.SetPlacementsFailure_FailedPlacement
// first go through all the successful placements
for _, placement := range req.GetPlacements() {
newPlacement := h.transitTasksInPlacement(
placement,
[]t.TaskState{
t.TaskState_PLACING,
t.TaskState_RESERVED,
},
t.TaskState_PLACED,
_reasonPlacementReceived)
h.rmTracker.SetPlacement(newPlacement)
err := h.placements.Enqueue(newPlacement)
if err == nil {
h.metrics.SetPlacementSuccess.Inc(1)
continue
}
// lets log the error and add the failed placement
log.WithField("placement", newPlacement).
WithError(err).
Error("Failed to enqueue placement")
failed = append(
failed,
&resmgrsvc.SetPlacementsFailure_FailedPlacement{
Placement: newPlacement,
Message: err.Error(),
},
)
h.metrics.SetPlacementFail.Inc(1)
}
// now we go through all the unsuccessful placements
for _, failedPlacement := range req.GetFailedPlacements() {
err := h.returnFailedPlacement(
failedPlacement.GetGang(),
failedPlacement.GetReason(),
)
if err != nil {
log.WithField("placement", failedPlacement).
WithError(err).
Error("Failed to enqueue failed placements")
h.metrics.SetPlacementFail.Inc(1)
}
}
// if there are any failures
if len(failed) > 0 {
return &resmgrsvc.SetPlacementsResponse{
Error: &resmgrsvc.SetPlacementsResponse_Error{
Failure: &resmgrsvc.SetPlacementsFailure{
Failed: failed,
},
},
}, nil
}
h.metrics.PlacementQueueLen.Update(float64(h.placements.Length()))
log.Debug("Set Placement Returned")
return &resmgrsvc.SetPlacementsResponse{}, nil
}
// returnFailedPlacement returns a failed placement gang to the resource manager.
// The failed gangs will be tried again to be placed at a later time.
// The two possible paths from here is
// 1. Put this task again to Ready queue
// 2. Put this to Pending queue
// Paths will be decided based on how many attempts have already been made for placement
func (h *ServiceHandler) returnFailedPlacement(
failedGang *resmgrsvc.Gang, reason string) error {
errs := new(multierror.Error)
for _, task := range failedGang.GetTasks() {
rmTask := h.rmTracker.GetTask(task.Id)
if rmTask == nil {
// task could have been deleted
continue
}
if err := rmTask.RequeueUnPlaced(reason); err != nil {
errs = multierror.Append(errs, err)
}
h.metrics.PlacementFailed.Inc(1)
}
return errs.ErrorOrNil()
}
// GetTasksByHosts returns all tasks of the given task type running on the given list of hosts.
func (h *ServiceHandler) GetTasksByHosts(ctx context.Context,
req *resmgrsvc.GetTasksByHostsRequest) (*resmgrsvc.GetTasksByHostsResponse, error) {
hostTasksMap := map[string]*resmgrsvc.TaskList{}
for hostname, tasks := range h.rmTracker.TasksByHosts(req.Hostnames, req.Type) {
if _, exists := hostTasksMap[hostname]; !exists {
hostTasksMap[hostname] = &resmgrsvc.TaskList{
Tasks: make([]*resmgr.Task, 0, len(tasks)),
}
}
for _, task := range tasks {
hostTasksMap[hostname].Tasks = append(hostTasksMap[hostname].Tasks, task.Task())
}
}
res := &resmgrsvc.GetTasksByHostsResponse{
HostTasksMap: hostTasksMap,
}
return res, nil
}
func (h *ServiceHandler) removeTasksFromPlacements(
placement *resmgr.Placement,
taskSet map[string]struct{},
) *resmgr.Placement {
if len(taskSet) == 0 {
return placement
}
var newTaskIDs []*resmgr.Placement_Task
log.WithFields(log.Fields{
"tasks_to_remove": taskSet,
"orig_tasks": placement.GetTaskIDs(),
}).Debug("Removing Tasks")
for _, pt := range placement.GetTaskIDs() {
if _, ok := taskSet[pt.GetMesosTaskID().GetValue()]; !ok {
newTaskIDs = append(newTaskIDs, pt)
}
}
placement.TaskIDs = newTaskIDs
return placement
}
// GetPlacements implements ResourceManagerService.GetPlacements
func (h *ServiceHandler) GetPlacements(
ctx context.Context,
req *resmgrsvc.GetPlacementsRequest,
) (*resmgrsvc.GetPlacementsResponse, error) {
log.WithField("request", req).Debug("GetPlacements called.")
h.metrics.APIGetPlacements.Inc(1)
limit := req.GetLimit()
timeout := time.Duration(req.GetTimeout())
h.metrics.APIGetPlacements.Inc(1)
var placements []*resmgr.Placement
for i := 0; i < int(limit); i++ {
item, err := h.placements.Dequeue(timeout * time.Millisecond)
if err != nil {
h.metrics.GetPlacementFail.Inc(1)
break
}
placement := item.(*resmgr.Placement)
newPlacement := h.transitTasksInPlacement(placement,
[]t.TaskState{
t.TaskState_PLACED,
},
t.TaskState_LAUNCHING,
_reasonDequeuedForLaunch)
placements = append(placements, newPlacement)
h.metrics.GetPlacementSuccess.Inc(1)
}
response := resmgrsvc.GetPlacementsResponse{Placements: placements}
h.metrics.PlacementQueueLen.Update(float64(h.placements.Length()))
log.Debug("Get Placement Returned")
return &response, nil
}
// transitTasksInPlacement transitions tasks to new state if the current state
// matches the expected states. Those tasks which couldn't be transitioned are
// removed from the placement. The will tried to place again in the next cycle.
func (h *ServiceHandler) transitTasksInPlacement(
placement *resmgr.Placement,
expectedStates []t.TaskState,
newState t.TaskState,
reason string) *resmgr.Placement {
invalidTaskSet := make(map[string]struct{})
for _, task := range placement.GetTaskIDs() {
rmTask := h.rmTracker.GetTask(task.GetPelotonTaskID())
if rmTask == nil ||
task.GetMesosTaskID().GetValue() != rmTask.Task().GetTaskId().GetValue() {
invalidTaskSet[task.GetMesosTaskID().GetValue()] = struct{}{}
continue
}
state := rmTask.GetCurrentState().State
if !util.ContainsTaskState(expectedStates, state) {
log.WithFields(log.Fields{
"task_id": task.GetPelotonTaskID().GetValue(),
"expected_state": expectedStates,
"actual_state": state.String(),
"new_state": newState.String(),
}).Error("Failed to transit tasks in placement: " +
"task is not in expected state")
invalidTaskSet[task.GetMesosTaskID().GetValue()] = struct{}{}
} else {
err := rmTask.TransitTo(
newState.String(),
statemachine.WithReason(reason),
)
if err != nil {
log.WithError(err).
WithField("task_id", task.GetPelotonTaskID().GetValue()).
Info("Failed to transit tasks in placement")
invalidTaskSet[task.GetMesosTaskID().GetValue()] = struct{}{}
}
}
}
return h.removeTasksFromPlacements(placement, invalidTaskSet)
}
// NotifyTaskUpdates is called by HM to notify task updates
func (h *ServiceHandler) NotifyTaskUpdates(
ctx context.Context,
req *resmgrsvc.NotifyTaskUpdatesRequest) (*resmgrsvc.NotifyTaskUpdatesResponse, error) {
var response resmgrsvc.NotifyTaskUpdatesResponse
if len(req.Events) == 0 {
log.Warn("Empty events received by resource manager")
return &response, nil
}
for _, e := range req.Events {
h.handleEvent(e)
}
response.PurgeOffset = atomic.LoadUint64(h.maxOffset)
return &response, nil
}
func (h *ServiceHandler) handleEvent(event *pb_eventstream.Event) {
defer h.acknowledgeEvent(event.Offset)
if event.GetType() != pb_eventstream.Event_MESOS_TASK_STATUS {
return
}
taskState := util.MesosStateToPelotonState(
event.MesosTaskStatus.GetState())
if taskState != t.TaskState_RUNNING &&
!util.IsPelotonStateTerminal(taskState) {
return
}
mesosTask := event.GetMesosTaskStatus().GetTaskId().GetValue()
ptID, err := util.ParseTaskIDFromMesosTaskID(
mesosTask,
)
if err != nil {
log.WithFields(log.Fields{
"event": event,
"mesos_task_id": mesosTask,
}).Error("Could not parse mesos task ID")
return
}
taskID := &peloton.TaskID{
Value: ptID,
}
rmTask := h.rmTracker.GetTask(taskID)
if rmTask == nil ||
(rmTask.Task().GetTaskId().GetValue() != mesosTask) {
// It might be an orphan task event
rmTask = h.rmTracker.GetOrphanTask(mesosTask)
}
if rmTask == nil {
return
}
if taskState == t.TaskState_RUNNING {
err = rmTask.TransitTo(taskState.String(), statemachine.WithReason("task running"))
if err != nil {
log.WithError(errors.WithStack(err)).
WithField("task_id", ptID).
Info("Not able to transition to RUNNING for task")
}
return
}
// TODO: We probably want to terminate all the tasks in gang
err = h.rmTracker.MarkItDone(event.GetMesosTaskStatus().GetTaskId().GetValue())
if err != nil {
log.WithField("event", event).WithError(err).Error(
"Could not be updated")
return
}
log.WithFields(log.Fields{
"task_id": ptID,
"current_state": taskState.String(),
"mesos_task_id": rmTask.Task().TaskId.Value,
}).Info("Task is completed and removed from tracker")
// publish metrics
rmtask.GetTracker().UpdateMetrics(
t.TaskState_RUNNING,
taskState,
scalar.ConvertToResmgrResource(rmTask.Task().GetResource()),
)
}
func (h *ServiceHandler) acknowledgeEvent(offset uint64) {
log.WithField("offset", offset).
Debug("Event received by resource manager")
if offset > atomic.LoadUint64(h.maxOffset) {
atomic.StoreUint64(h.maxOffset, offset)
}
}
func (h *ServiceHandler) fillTaskEntry(task *rmtask.RMTask,
) *resmgrsvc.GetActiveTasksResponse_TaskEntry {
rmTaskState := task.GetCurrentState()
taskEntry := &resmgrsvc.GetActiveTasksResponse_TaskEntry{
TaskID: task.Task().GetTaskId().GetValue(),
TaskState: rmTaskState.State.String(),
Reason: rmTaskState.Reason,
LastUpdateTime: rmTaskState.LastUpdateTime.String(),
Hostname: task.Task().GetHostname(),
}
return taskEntry
}
// GetActiveTasks returns the active tasks in the scheduler based on the filters
// The filters can be particular task states, job ID or resource pool ID.
func (h *ServiceHandler) GetActiveTasks(
ctx context.Context,
req *resmgrsvc.GetActiveTasksRequest,
) (*resmgrsvc.GetActiveTasksResponse, error) {
var taskStates = map[string]*resmgrsvc.GetActiveTasksResponse_TaskEntries{}
log.WithField("req", req).Info("GetActiveTasks called")
taskStateMap := h.rmTracker.GetActiveTasks(
req.GetJobID(),
req.GetRespoolID(),
req.GetStates(),
)
for state, tasks := range taskStateMap {
for _, task := range tasks {
taskEntry := h.fillTaskEntry(task)
if _, ok := taskStates[state]; !ok {
var taskList resmgrsvc.GetActiveTasksResponse_TaskEntries
taskStates[state] = &taskList
}
taskStates[state].TaskEntry = append(
taskStates[state].GetTaskEntry(),
taskEntry,
)
}
}
log.Info("GetActiveTasks returned")
return &resmgrsvc.GetActiveTasksResponse{
TasksByState: taskStates,
}, nil
}
// GetPendingTasks returns the pending tasks from a resource pool in the
// order in which they were added up to a max limit number of gangs.
// Eg specifying a limit of 10 would return pending tasks from the first 10
// gangs in the queue.
// The tasks are grouped according to their gang membership since a gang is the
// unit of admission.
func (h *ServiceHandler) GetPendingTasks(
ctx context.Context,
req *resmgrsvc.GetPendingTasksRequest,
) (*resmgrsvc.GetPendingTasksResponse, error) {
respoolID := req.GetRespoolID()
limit := req.GetLimit()
log.WithFields(log.Fields{
"respool_id": respoolID,
"limit": limit,
}).Info("GetPendingTasks called")
if respoolID == nil {
return &resmgrsvc.GetPendingTasksResponse{},
status.Errorf(codes.InvalidArgument,
"resource pool ID can't be nil")
}
node, err := h.resPoolTree.Get(&peloton.ResourcePoolID{
Value: respoolID.GetValue()})
if err != nil {
return &resmgrsvc.GetPendingTasksResponse{},
status.Errorf(codes.NotFound,
"resource pool ID not found:%s", respoolID)
}
if !node.IsLeaf() {
return &resmgrsvc.GetPendingTasksResponse{},
status.Errorf(codes.InvalidArgument,
"resource pool:%s is not a leaf node", respoolID)
}
// returns a list of pending resmgr.gangs for each queue
gangsInQueue, err := h.getPendingGangs(node, limit)
if err != nil {
return &resmgrsvc.GetPendingTasksResponse{},
status.Errorf(codes.Internal,
"failed to return pending tasks, err:%s", err.Error())
}
// marshall the response since we only care about task ID's
pendingGangs := make(map[string]*resmgrsvc.GetPendingTasksResponse_PendingGangs)
for q, gangs := range gangsInQueue {
var pendingGang []*resmgrsvc.GetPendingTasksResponse_PendingGang
for _, gang := range gangs {
var taskIDs []string
for _, task := range gang.GetTasks() {
taskIDs = append(taskIDs, task.GetId().GetValue())
}
pendingGang = append(pendingGang,
&resmgrsvc.GetPendingTasksResponse_PendingGang{
TaskIDs: taskIDs})
}
pendingGangs[q.String()] = &resmgrsvc.GetPendingTasksResponse_PendingGangs{
PendingGangs: pendingGang,
}
}
log.WithFields(log.Fields{
"respool_id": respoolID,
"limit": limit,
"pending_gangs": pendingGangs,
}).Debug("GetPendingTasks returned")
return &resmgrsvc.GetPendingTasksResponse{
PendingGangsByQueue: pendingGangs,
}, nil
}
func (h *ServiceHandler) getPendingGangs(node respool.ResPool,
limit uint32) (map[respool.QueueType][]*resmgrsvc.Gang,
error) {
var gangs []*resmgrsvc.Gang
var err error
gangsInQueue := make(map[respool.QueueType][]*resmgrsvc.Gang)
for _, q := range []respool.QueueType{
respool.PendingQueue,
respool.NonPreemptibleQueue,
respool.ControllerQueue,
respool.RevocableQueue} {
gangs, err = node.PeekGangs(q, limit)
if err != nil {
if _, ok := err.(r_queue.ErrorQueueEmpty); ok {
// queue is empty, move to the next one
continue
}
return gangsInQueue, errors.Wrap(err, "failed to peek pending gangs")
}
gangsInQueue[q] = gangs
}
return gangsInQueue, nil
}
// KillTasks kills the task
func (h *ServiceHandler) KillTasks(
ctx context.Context,
req *resmgrsvc.KillTasksRequest,
) (*resmgrsvc.KillTasksResponse, error) {
listTasks := req.GetTasks()
if len(listTasks) == 0 {
return &resmgrsvc.KillTasksResponse{
Error: []*resmgrsvc.KillTasksResponse_Error{
{
NotFound: &resmgrsvc.TasksNotFound{
Message: "Kill tasks called with no tasks",
},
},
},
}, nil
}
log.WithField("tasks", listTasks).Info("tasks to be killed")
var tasksNotFound []*peloton.TaskID
var tasksNotKilled []*peloton.TaskID
for _, taskTobeKilled := range listTasks {
rmTaskToKill := h.rmTracker.GetTask(taskTobeKilled)
if rmTaskToKill == nil {
tasksNotFound = append(tasksNotFound, taskTobeKilled)
continue
}
err := h.rmTracker.MarkItInvalid(rmTaskToKill.Task().GetTaskId().GetValue())
if err != nil {
tasksNotKilled = append(tasksNotKilled, taskTobeKilled)
continue
}
log.WithFields(log.Fields{
"task_id": taskTobeKilled.Value,
"current_state": rmTaskToKill.GetCurrentState().State.String(),
}).Info("Task is Killed and removed from tracker")
h.rmTracker.UpdateMetrics(
rmTaskToKill.GetCurrentState().State,
t.TaskState_KILLED,
scalar.ConvertToResmgrResource(rmTaskToKill.Task().GetResource()),
)
}
// release the hosts held for the tasks killed
resp, err := h.hostmgrClient.ReleaseHostsHeldForTasks(ctx, &hostsvc.ReleaseHostsHeldForTasksRequest{
Ids: listTasks,
})
if err != nil || resp.GetError() != nil {
// ignore resp.Error to avoid potential infinite retry.
// rely on hostmgr periodical clean up if fails to release.
// may revisit this decision later
log.WithFields(log.Fields{
"task_ids": listTasks,
"err": err.Error(),
"resp_err": resp.GetError(),
}).Warn("Fail to release hosts held for tasks upon kill")
}
if err != nil {
return &resmgrsvc.KillTasksResponse{}, err
}
if len(tasksNotKilled) == 0 && len(tasksNotFound) == 0 {
return &resmgrsvc.KillTasksResponse{}, nil
}
var killResponseErr []*resmgrsvc.KillTasksResponse_Error
for _, task := range tasksNotFound {
killResponseErr = append(killResponseErr,
&resmgrsvc.KillTasksResponse_Error{
NotFound: &resmgrsvc.TasksNotFound{
Message: "Task Not Found",
Task: task,
},
})
}
for _, task := range tasksNotKilled {
killResponseErr = append(killResponseErr,
&resmgrsvc.KillTasksResponse_Error{
KillError: &resmgrsvc.KillTasksError{
Message: "Task can't be killed",
Task: task,
},
})
}
return &resmgrsvc.KillTasksResponse{
Error: killResponseErr,
}, nil
}
// GetPreemptibleTasks returns tasks which need to be preempted from the resource pool
func (h *ServiceHandler) GetPreemptibleTasks(
ctx context.Context,
req *resmgrsvc.GetPreemptibleTasksRequest) (*resmgrsvc.GetPreemptibleTasksResponse, error) {
log.WithField("request", req).Debug("GetPreemptibleTasks called.")
h.metrics.APIGetPreemptibleTasks.Inc(1)
limit := req.GetLimit()
timeout := time.Duration(req.GetTimeout())
var preemptionCandidates []*resmgr.PreemptionCandidate
for i := 0; i < int(limit); i++ {
preemptionCandidate, err := h.preemptionQueue.DequeueTask(timeout * time.Millisecond)
if err != nil {
// no more tasks
h.metrics.GetPreemptibleTasksTimeout.Inc(1)
break
}
if preemptionCandidate.GetReason() == resmgr.PreemptionReason_PREEMPTION_REASON_REVOKE_RESOURCES {
// Transit task state machine to PREEMPTING
if rmTask := h.rmTracker.GetTask(preemptionCandidate.Id); rmTask != nil {
err = rmTask.TransitTo(
t.TaskState_PREEMPTING.String(), statemachine.WithReason("preemption triggered"))
if err != nil {
// the task could have moved from RUNNING state
log.WithError(err).
WithField("task_id", preemptionCandidate.Id.Value).
Error("failed to transit state for task")
continue
}
} else {
log.WithError(err).
WithField("task_id", preemptionCandidate.Id.Value).
Error("failed to find task in the tracker")
continue
}
}
preemptionCandidates = append(preemptionCandidates, preemptionCandidate)
}
log.WithField("preemptible_tasks", preemptionCandidates).
Info("GetPreemptibleTasks returned")
h.metrics.GetPreemptibleTasksSuccess.Inc(1)
return &resmgrsvc.GetPreemptibleTasksResponse{
PreemptionCandidates: preemptionCandidates,
}, nil
}
// UpdateTasksState will be called to notify the resource manager about the tasks
// which have been moved to cooresponding state , by that resource manager
// can take appropriate actions for those tasks. As an example if the tasks been
// launched then job manager will call resource manager to notify it is launched
// by that resource manager can stop timer for launching state. Similarly if
// task is been failed to be launched in host manager due to valid failure then
// job manager will tell resource manager about the task to be killed by that
// resource manager can remove the task from the tracker and relevant
// resource accounting can be done.
func (h *ServiceHandler) UpdateTasksState(
ctx context.Context,
req *resmgrsvc.UpdateTasksStateRequest) (*resmgrsvc.UpdateTasksStateResponse, error) {
taskStateList := req.GetTaskStates()
if len(taskStateList) == 0 {
return &resmgrsvc.UpdateTasksStateResponse{}, nil
}
log.WithField("task_state_list", taskStateList).
Debug("tasks called with states")
h.metrics.APILaunchedTasks.Inc(1)
for _, updateEntry := range taskStateList {
id := updateEntry.GetTask()
// Checking if the task is present in tracker, if not
// drop that task to be updated
task := h.rmTracker.GetTask(id)
if task == nil {
continue
}
// Checking if the state for the task is in terminal state
if util.IsPelotonStateTerminal(updateEntry.GetState()) {
err := h.rmTracker.MarkItDone(updateEntry.GetMesosTaskId().GetValue())
if err != nil {
log.WithError(err).WithFields(log.Fields{
"task_id": id,
"update_entry": updateEntry,
}).Error("could not update task")
}
continue
}
// Checking if the mesos task is same
// otherwise drop the task
if *task.Task().TaskId.Value != *updateEntry.GetMesosTaskId().Value {
continue
}
err := task.TransitTo(updateEntry.GetState().String(),
statemachine.WithReason(
fmt.Sprintf("task moved to %s",
updateEntry.GetState().String())))
if err != nil {
log.WithError(err).
WithFields(log.Fields{
"task_id": id,
"to_state": updateEntry.GetState().String(),
}).Info("failed to transit")
continue
}
}
return &resmgrsvc.UpdateTasksStateResponse{}, nil
}
// GetOrphanTasks returns the list of orphan tasks
func (h *ServiceHandler) GetOrphanTasks(
ctx context.Context,
req *resmgrsvc.GetOrphanTasksRequest,
) (*resmgrsvc.GetOrphanTasksResponse, error) {
var orphanTasks []*resmgr.Task
for _, rmTask := range h.rmTracker.GetOrphanTasks(req.GetRespoolID()) {
orphanTasks = append(orphanTasks, rmTask.Task())
}
return &resmgrsvc.GetOrphanTasksResponse{
OrphanTasks: orphanTasks,
}, nil
}
// GetHostsByScores returns a list of batch hosts with lowest host scores
func (h *ServiceHandler) GetHostsByScores(
ctx context.Context,
req *resmgrsvc.GetHostsByScoresRequest,
) (*resmgrsvc.GetHostsByScoresResponse, error) {
return &resmgrsvc.GetHostsByScoresResponse{
Hosts: h.batchScorer.GetHostsByScores(req.Limit),
}, nil
}
// NewTestServiceHandler returns an empty new ServiceHandler ptr for testing.
func NewTestServiceHandler() *ServiceHandler {
return &ServiceHandler{}
}