in internal/batch/container_status_req_handler.go [53:184]
func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []interface{}, workerAddr string) {
reqs := make([]*schedulerx.ContainerReportTaskStatusRequest, 0, len(requests))
for _, req := range requests {
reqs = append(reqs, req.(*schedulerx.ContainerReportTaskStatusRequest))
}
if len(reqs) == 0 {
logger.Warnf("Process ContainerStatusReqHandler, but reqs is empty, jobInstanceId=%d, workerAddr=%s", jobInstanceId, workerAddr)
return
}
err := h.batchProcessSvc.Submit(func() {
if h.enableShareContainerPool {
// FIXME fix import cycle
// // 如果开启共享线程池,statues 可能会有多个 jobInstanceId,需要先 split 成不同的 list
// taskStatusRequestMap := make(map[*Pair][]*schedulerx.ContainerReportTaskStatusRequest)
// for _, req := range reqs {
// pair := &Pair{
// jobInstanceId: req.GetJobInstanceId(),
// serialNum: req.GetSerialNum(),
// }
// if _, ok := taskStatusRequestMap[pair]; ok {
// taskStatusRequestMap[pair] = append(taskStatusRequestMap[pair], req)
// } else {
// taskStatusRequestMap[pair] = []*schedulerx.ContainerReportTaskStatusRequest{req}
// }
// }
//
// // 针对不同的 jobInstanceId,构造 batchStatusRequests
// for pair, reqs := range taskStatusRequestMap {
// var (
// instanceMasterActorPath string
// finishCount = 0
// taskStatuses = make([]*schedulerx.TaskStatusInfo, 0, len(reqs))
// )
// for _, req := range reqs {
// finishCount := 0
// instanceMasterActorPath = req.GetInstanceMasterActorPath()
// taskStatusInfo := &schedulerx.TaskStatusInfo{
// TaskId: proto.Int64(req.GetTaskId()),
// Status: proto.Int32(req.GetStatus()),
// }
// if req.GetTaskName() != "" {
// taskStatusInfo.TaskName = proto.String(req.GetTaskName())
// }
// if req.GetResult() != "" {
// taskStatusInfo.Result = proto.String(req.GetResult())
// }
// if req.GetProgress() != "" {
// taskStatusInfo.Progress = proto.String(req.GetProgress())
// }
// if req.GetTraceId() != "" {
// taskStatusInfo.TraceId = proto.String(req.GetTraceId())
// }
// if common.TaskStatus(req.GetStatus()).IsFinished() {
// finishCount++
// }
// taskStatuses = append(taskStatuses, taskStatusInfo)
// }
//
// if instanceMasterActorPath != "" {
// taskStatusRequest := reqs[0]
// sharedThreadPool := container.GetThreadContainerPool().GetSharedThreadPool()
// if finishCount > 0 && sharedThreadPool != nil {
// // 可用大小可用线程数 + 线程数等量缓冲区
// // TODO implement it
// //metrics := common.Metrics{}
// //availableSize := float64(sharedThreadPool.Cap() - sharedThreadPool.Running()) +
// // (math.Sqrt(sharedThreadPool.Cap()) - float64(sharedThreadPool.Free())) + float64(finishCount)
// //metrics.setSharePoolAvailableSize(availableSize)
// }
// req := &schedulerx.ContainerBatchReportTaskStatuesRequest{
// GetJobId: taskStatusRequest.GetJobId,
// JobInstanceId: proto.Int64(pair.jobInstanceId),
// TaskStatues: taskStatuses,
// TaskMasterAkkaPath: proto.String(instanceMasterActorPath),
// WorkerAddr: taskStatusRequest.WorkerAddr,
// WorkerId: taskStatusRequest.WorkerId,
// SerialNum: proto.Int64(pair.serialNum),
// }
// actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
// Msg: req,
// }
// logger.Infof("jobInstanceId=%v, serialNum=%v, batch report status=%v to task master, size:%v",
// pair.jobInstanceId, pair.serialNum, taskStatusRequest.GetStatus(), len(taskStatuses))
// } else {
// logger.Errorf("instanceMasterActorPath is null, jobInstanceId=%d", jobInstanceId)
// }
// }
} else {
taskStatuses := make([]*schedulerx.TaskStatusInfo, 0, len(reqs))
// some attrs are duplicated in all reqs, for example: workAddr, workerId, jobId, jobInstanceId, taskMasterPath
// get first one used for all reqs.
taskStatusRequest := reqs[0]
for _, req := range reqs {
taskStatusInfo := &schedulerx.TaskStatusInfo{
TaskId: proto.Int64(req.GetTaskId()),
Status: proto.Int32(req.GetStatus()),
}
if req.GetTaskName() != "" {
taskStatusInfo.TaskName = proto.String(req.GetTaskName())
}
if req.GetResult() != "" {
taskStatusInfo.Result = proto.String(req.GetResult())
}
if req.GetProgress() != "" {
taskStatusInfo.Progress = proto.String(req.GetProgress())
}
if req.GetTraceId() != "" {
taskStatusInfo.TraceId = proto.String(req.GetTraceId())
}
taskStatuses = append(taskStatuses, taskStatusInfo)
}
req := &schedulerx.ContainerBatchReportTaskStatuesRequest{
JobId: taskStatusRequest.JobId,
JobInstanceId: taskStatusRequest.JobInstanceId,
TaskStatues: taskStatuses,
TaskMasterAkkaPath: taskStatusRequest.InstanceMasterActorPath,
WorkerAddr: taskStatusRequest.WorkerAddr,
WorkerId: taskStatusRequest.WorkerId,
SerialNum: taskStatusRequest.SerialNum,
}
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
}
h.activeRunnableNum.Dec()
})
if err != nil {
logger.Errorf("Process ContainerStatusReqHandler failed, submit to batchProcessSvc failed, err=%s", err.Error())
}
}