in internal/actor/container_actor.go [114:157]
func (a *containerActor) handleBatchStartContainers(actorCtx actor.Context, req *schedulerx.MasterBatchStartContainersRequest) {
err := a.containerStarter.Submit(func() {
for _, startReq := range req.StartReqs {
uniqueId, err := a.startContainer(actorCtx, startReq)
if err != nil {
// report task fail status to task master
reportTaskStatusReq := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(startReq.GetJobId()),
JobInstanceId: proto.Int64(startReq.GetJobInstanceId()),
TaskId: proto.Int64(startReq.GetTaskId()),
Status: proto.Int32(int32(taskstatus.TaskStatusFailed)),
WorkerId: proto.String(utils.GetWorkerId()),
WorkerAddr: proto.String(actorCtx.ActorSystem().Address()),
}
if startReq.GetTaskName() != "" {
reportTaskStatusReq.TaskName = proto.String(startReq.GetTaskName())
}
if senderPid := actorCtx.Sender(); senderPid != nil {
actorCtx.Send(senderPid, reportTaskStatusReq)
} else {
logger.Warnf("Cannot send ContainerReportTaskStatusRequest due to sender is unknown in handleBatchStartContainers of containerActor, request=%+v", req)
}
}
logger.Debugf("submit container to containerPool, uniqueId=%v, cost=%vms", uniqueId, time.Now().UnixMilli()-startReq.GetScheduleTime())
}
})
resp := new(schedulerx.MasterBatchStartContainersResponse)
if err != nil {
logger.Errorf("handleBatchStartContainers failed, err=%s", err.Error())
resp = &schedulerx.MasterBatchStartContainersResponse{
Success: proto.Bool(false),
Message: proto.String(err.Error()),
}
} else {
resp = &schedulerx.MasterBatchStartContainersResponse{
Success: proto.Bool(true),
}
}
if senderPid := actorCtx.Sender(); senderPid != nil {
actorCtx.Request(senderPid, resp)
} else {
logger.Warnf("Cannot send MasterBatchStartContainersResponse due to sender is unknown in handleBatchStartContainers of containerActor, request=%+v", req)
}
}