in pkg/hostmgr/handler.go [703:838]
func (h *ServiceHandler) LaunchTasks(
ctx context.Context,
req *hostsvc.LaunchTasksRequest,
) (response *hostsvc.LaunchTasksResponse, err error) {
defer func() {
if err != nil {
err = yarpcutil.ConvertToYARPCError(err)
return
}
}()
if err := validateLaunchTasks(req); err != nil {
err = yarpcerrors.InvalidArgumentErrorf("%s", err)
log.WithFields(log.Fields{
"hostname": req.GetHostname(),
"host_offer_id": req.GetId(),
"mesos_agent_id": req.GetAgentId(),
}).WithError(err).Error("validate launch tasks failed")
h.metrics.LaunchTasksInvalid.Inc(1)
return &hostsvc.LaunchTasksResponse{
Error: &hostsvc.LaunchTasksResponse_Error{
InvalidArgument: &hostsvc.InvalidArgument{
Message: err.Error(),
},
},
}, errors.Wrap(err, "validate launch tasks failed")
}
hostToTaskIDs := make(map[string][]*peloton.TaskID)
for _, launchableTask := range req.GetTasks() {
hostHeld := h.offerPool.GetHostHeldForTask(launchableTask.GetId())
if len(hostHeld) != 0 {
hostToTaskIDs[hostHeld] =
append(hostToTaskIDs[hostHeld], launchableTask.GetId())
}
}
for hostname, taskIDs := range hostToTaskIDs {
if hostname != req.GetHostname() {
log.WithFields(log.Fields{
"task_ids": taskIDs,
"host_held": hostname,
"host_launched": req.GetHostname(),
}).Info("task not launched on the host held")
if err := h.offerPool.ReleaseHoldForTasks(hostname, taskIDs); err != nil {
log.WithFields(log.Fields{
"task_ids": taskIDs,
"host_held": hostname,
"error": err,
}).Warn("fail to release held host when launching tasks on other hosts")
continue
}
}
}
_, err = h.offerPool.ClaimForLaunch(
req.GetHostname(),
req.GetId().GetValue(),
req.GetTasks(),
hostToTaskIDs[req.GetHostname()]...,
)
if err != nil {
log.WithFields(log.Fields{
"hostname": req.GetHostname(),
"host_offer_id": req.GetId(),
"mesos_agent_id": req.GetAgentId(),
}).WithError(err).Error("claim for launch failed")
h.metrics.LaunchTasksInvalidOffers.Inc(1)
return &hostsvc.LaunchTasksResponse{
Error: &hostsvc.LaunchTasksResponse_Error{
InvalidOffers: &hostsvc.InvalidOffers{
Message: err.Error(),
},
},
}, errors.Wrap(err, "claim for launch failed")
}
// temporary workaround to add hosts into cache. This step
// was part of ClaimForLaunch
h.hostCache.AddPodsToHost(req.GetTasks(), req.GetHostname())
var launchablePods []*models.LaunchablePod
for _, task := range req.GetTasks() {
jobID, instanceID, err := util.ParseJobAndInstanceID(task.GetTaskId().GetValue())
if err != nil {
log.WithFields(
log.Fields{
"mesos_id": task.GetTaskId().GetValue(),
}).WithError(err).
Error("fail to parse ID when constructing launchable pods in LaunchTask")
continue
}
launchablePods = append(launchablePods, &models.LaunchablePod{
PodId: util.CreatePodIDFromMesosTaskID(task.GetTaskId()),
Spec: api.ConvertTaskConfigToPodSpec(task.GetConfig(), jobID, instanceID),
Ports: task.Ports,
})
}
launchedPods, err := h.plugin.LaunchPods(ctx, launchablePods, req.GetHostname())
if err != nil {
h.metrics.LaunchTasksFail.Inc(int64(len(req.GetTasks())))
log.WithFields(log.Fields{
"error": err,
"host_offer_id": req.GetId().GetValue(),
}).Warn("Tasks launch failure")
return &hostsvc.LaunchTasksResponse{
Error: &hostsvc.LaunchTasksResponse_Error{
LaunchFailure: &hostsvc.LaunchFailure{
Message: err.Error(),
},
},
}, errors.Wrap(err, "task launch failed")
}
h.metrics.LaunchTasks.Inc(int64(len(launchedPods)))
var taskIDs []string
for _, pod := range launchedPods {
taskIDs = append(taskIDs, pod.PodId.GetValue())
}
log.WithFields(log.Fields{
"task_ids": taskIDs,
"hostname": req.GetHostname(),
"host_offer_id": req.GetId().GetValue(),
}).Info("LaunchTasks")
return &hostsvc.LaunchTasksResponse{}, nil
}