func()

in internal/actor/container_actor.go [114:157]


func (a *containerActor) handleBatchStartContainers(actorCtx actor.Context, req *schedulerx.MasterBatchStartContainersRequest) {
	err := a.containerStarter.Submit(func() {
		for _, startReq := range req.StartReqs {
			uniqueId, err := a.startContainer(actorCtx, startReq)
			if err != nil {
				// report task fail status to task master
				reportTaskStatusReq := &schedulerx.ContainerReportTaskStatusRequest{
					JobId:         proto.Int64(startReq.GetJobId()),
					JobInstanceId: proto.Int64(startReq.GetJobInstanceId()),
					TaskId:        proto.Int64(startReq.GetTaskId()),
					Status:        proto.Int32(int32(taskstatus.TaskStatusFailed)),
					WorkerId:      proto.String(utils.GetWorkerId()),
					WorkerAddr:    proto.String(actorCtx.ActorSystem().Address()),
				}
				if startReq.GetTaskName() != "" {
					reportTaskStatusReq.TaskName = proto.String(startReq.GetTaskName())
				}
				if senderPid := actorCtx.Sender(); senderPid != nil {
					actorCtx.Send(senderPid, reportTaskStatusReq)
				} else {
					logger.Warnf("Cannot send ContainerReportTaskStatusRequest due to sender is unknown in handleBatchStartContainers of containerActor, request=%+v", req)
				}
			}
			logger.Debugf("submit container to containerPool, uniqueId=%v, cost=%vms", uniqueId, time.Now().UnixMilli()-startReq.GetScheduleTime())
		}
	})
	resp := new(schedulerx.MasterBatchStartContainersResponse)
	if err != nil {
		logger.Errorf("handleBatchStartContainers failed, err=%s", err.Error())
		resp = &schedulerx.MasterBatchStartContainersResponse{
			Success: proto.Bool(false),
			Message: proto.String(err.Error()),
		}
	} else {
		resp = &schedulerx.MasterBatchStartContainersResponse{
			Success: proto.Bool(true),
		}
	}
	if senderPid := actorCtx.Sender(); senderPid != nil {
		actorCtx.Request(senderPid, resp)
	} else {
		logger.Warnf("Cannot send MasterBatchStartContainersResponse due to sender is unknown in handleBatchStartContainers of containerActor, request=%+v", req)
	}
}