func()

in pkg/hostmgr/handler.go [703:838]


func (h *ServiceHandler) LaunchTasks(
	ctx context.Context,
	req *hostsvc.LaunchTasksRequest,
) (response *hostsvc.LaunchTasksResponse, err error) {

	defer func() {
		if err != nil {
			err = yarpcutil.ConvertToYARPCError(err)
			return
		}
	}()

	if err := validateLaunchTasks(req); err != nil {
		err = yarpcerrors.InvalidArgumentErrorf("%s", err)
		log.WithFields(log.Fields{
			"hostname":       req.GetHostname(),
			"host_offer_id":  req.GetId(),
			"mesos_agent_id": req.GetAgentId(),
		}).WithError(err).Error("validate launch tasks failed")
		h.metrics.LaunchTasksInvalid.Inc(1)

		return &hostsvc.LaunchTasksResponse{
			Error: &hostsvc.LaunchTasksResponse_Error{
				InvalidArgument: &hostsvc.InvalidArgument{
					Message: err.Error(),
				},
			},
		}, errors.Wrap(err, "validate launch tasks failed")
	}

	hostToTaskIDs := make(map[string][]*peloton.TaskID)
	for _, launchableTask := range req.GetTasks() {
		hostHeld := h.offerPool.GetHostHeldForTask(launchableTask.GetId())
		if len(hostHeld) != 0 {
			hostToTaskIDs[hostHeld] =
				append(hostToTaskIDs[hostHeld], launchableTask.GetId())
		}
	}

	for hostname, taskIDs := range hostToTaskIDs {
		if hostname != req.GetHostname() {
			log.WithFields(log.Fields{
				"task_ids":      taskIDs,
				"host_held":     hostname,
				"host_launched": req.GetHostname(),
			}).Info("task not launched on the host held")

			if err := h.offerPool.ReleaseHoldForTasks(hostname, taskIDs); err != nil {
				log.WithFields(log.Fields{
					"task_ids":  taskIDs,
					"host_held": hostname,
					"error":     err,
				}).Warn("fail to release held host when launching tasks on other hosts")
				continue
			}
		}
	}

	_, err = h.offerPool.ClaimForLaunch(
		req.GetHostname(),
		req.GetId().GetValue(),
		req.GetTasks(),
		hostToTaskIDs[req.GetHostname()]...,
	)
	if err != nil {
		log.WithFields(log.Fields{
			"hostname":       req.GetHostname(),
			"host_offer_id":  req.GetId(),
			"mesos_agent_id": req.GetAgentId(),
		}).WithError(err).Error("claim for launch failed")
		h.metrics.LaunchTasksInvalidOffers.Inc(1)

		return &hostsvc.LaunchTasksResponse{
			Error: &hostsvc.LaunchTasksResponse_Error{
				InvalidOffers: &hostsvc.InvalidOffers{
					Message: err.Error(),
				},
			},
		}, errors.Wrap(err, "claim for launch failed")
	}

	// temporary workaround to add hosts into cache. This step
	// was part of ClaimForLaunch
	h.hostCache.AddPodsToHost(req.GetTasks(), req.GetHostname())

	var launchablePods []*models.LaunchablePod
	for _, task := range req.GetTasks() {
		jobID, instanceID, err := util.ParseJobAndInstanceID(task.GetTaskId().GetValue())
		if err != nil {
			log.WithFields(
				log.Fields{
					"mesos_id": task.GetTaskId().GetValue(),
				}).WithError(err).
				Error("fail to parse ID when constructing launchable pods in LaunchTask")
			continue
		}

		launchablePods = append(launchablePods, &models.LaunchablePod{
			PodId: util.CreatePodIDFromMesosTaskID(task.GetTaskId()),
			Spec:  api.ConvertTaskConfigToPodSpec(task.GetConfig(), jobID, instanceID),
			Ports: task.Ports,
		})
	}

	launchedPods, err := h.plugin.LaunchPods(ctx, launchablePods, req.GetHostname())
	if err != nil {
		h.metrics.LaunchTasksFail.Inc(int64(len(req.GetTasks())))
		log.WithFields(log.Fields{
			"error":         err,
			"host_offer_id": req.GetId().GetValue(),
		}).Warn("Tasks launch failure")

		return &hostsvc.LaunchTasksResponse{
			Error: &hostsvc.LaunchTasksResponse_Error{
				LaunchFailure: &hostsvc.LaunchFailure{
					Message: err.Error(),
				},
			},
		}, errors.Wrap(err, "task launch failed")
	}

	h.metrics.LaunchTasks.Inc(int64(len(launchedPods)))

	var taskIDs []string
	for _, pod := range launchedPods {
		taskIDs = append(taskIDs, pod.PodId.GetValue())
	}

	log.WithFields(log.Fields{
		"task_ids":      taskIDs,
		"hostname":      req.GetHostname(),
		"host_offer_id": req.GetId().GetValue(),
	}).Info("LaunchTasks")

	return &hostsvc.LaunchTasksResponse{}, nil
}