func()

in internal/batch/container_status_req_handler.go [53:184]


func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []interface{}, workerAddr string) {
	reqs := make([]*schedulerx.ContainerReportTaskStatusRequest, 0, len(requests))
	for _, req := range requests {
		reqs = append(reqs, req.(*schedulerx.ContainerReportTaskStatusRequest))
	}
	if len(reqs) == 0 {
		logger.Warnf("Process ContainerStatusReqHandler, but reqs is empty, jobInstanceId=%d, workerAddr=%s", jobInstanceId, workerAddr)
		return
	}

	err := h.batchProcessSvc.Submit(func() {
		if h.enableShareContainerPool {
			// FIXME fix import cycle
			// // 如果开启共享线程池,statues 可能会有多个 jobInstanceId,需要先 split 成不同的 list
			// taskStatusRequestMap := make(map[*Pair][]*schedulerx.ContainerReportTaskStatusRequest)
			// for _, req := range reqs {
			//	pair := &Pair{
			//		jobInstanceId: req.GetJobInstanceId(),
			//		serialNum:     req.GetSerialNum(),
			//	}
			//	if _, ok := taskStatusRequestMap[pair]; ok {
			//		taskStatusRequestMap[pair] = append(taskStatusRequestMap[pair], req)
			//	} else {
			//		taskStatusRequestMap[pair] = []*schedulerx.ContainerReportTaskStatusRequest{req}
			//	}
			// }
			//
			// // 针对不同的 jobInstanceId,构造 batchStatusRequests
			// for pair, reqs := range taskStatusRequestMap {
			//	var (
			//		instanceMasterActorPath string
			//		finishCount             = 0
			//		taskStatuses            = make([]*schedulerx.TaskStatusInfo, 0, len(reqs))
			//	)
			//	for _, req := range reqs {
			//		finishCount := 0
			//		instanceMasterActorPath = req.GetInstanceMasterActorPath()
			//		taskStatusInfo := &schedulerx.TaskStatusInfo{
			//			TaskId: proto.Int64(req.GetTaskId()),
			//			Status: proto.Int32(req.GetStatus()),
			//		}
			//		if req.GetTaskName() != "" {
			//			taskStatusInfo.TaskName = proto.String(req.GetTaskName())
			//		}
			//		if req.GetResult() != "" {
			//			taskStatusInfo.Result = proto.String(req.GetResult())
			//		}
			//		if req.GetProgress() != "" {
			//			taskStatusInfo.Progress = proto.String(req.GetProgress())
			//		}
			//		if req.GetTraceId() != "" {
			//			taskStatusInfo.TraceId = proto.String(req.GetTraceId())
			//		}
			//		if common.TaskStatus(req.GetStatus()).IsFinished() {
			//			finishCount++
			//		}
			//		taskStatuses = append(taskStatuses, taskStatusInfo)
			//	}
			//
			//	if instanceMasterActorPath != "" {
			//		taskStatusRequest := reqs[0]
			//		sharedThreadPool := container.GetThreadContainerPool().GetSharedThreadPool()
			//		if finishCount > 0 && sharedThreadPool != nil {
			//			// 可用大小可用线程数 + 线程数等量缓冲区
			//			// TODO implement it
			//			//metrics := common.Metrics{}
			//			//availableSize := float64(sharedThreadPool.Cap() - sharedThreadPool.Running()) +
			//			//	(math.Sqrt(sharedThreadPool.Cap()) - float64(sharedThreadPool.Free())) + float64(finishCount)
			//			//metrics.setSharePoolAvailableSize(availableSize)
			//		}
			//		req := &schedulerx.ContainerBatchReportTaskStatuesRequest{
			//			GetJobId:              taskStatusRequest.GetJobId,
			//			JobInstanceId:      proto.Int64(pair.jobInstanceId),
			//			TaskStatues:        taskStatuses,
			//			TaskMasterAkkaPath: proto.String(instanceMasterActorPath),
			//			WorkerAddr:         taskStatusRequest.WorkerAddr,
			//			WorkerId:           taskStatusRequest.WorkerId,
			//			SerialNum:          proto.Int64(pair.serialNum),
			//		}
			//		actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
			//			Msg: req,
			//		}
			//		logger.Infof("jobInstanceId=%v, serialNum=%v, batch report status=%v to task master, size:%v",
			//			pair.jobInstanceId, pair.serialNum, taskStatusRequest.GetStatus(), len(taskStatuses))
			//	} else {
			//		logger.Errorf("instanceMasterActorPath is null, jobInstanceId=%d", jobInstanceId)
			//	}
			// }
		} else {
			taskStatuses := make([]*schedulerx.TaskStatusInfo, 0, len(reqs))
			// some attrs are duplicated in all reqs, for example: workAddr, workerId, jobId, jobInstanceId, taskMasterPath
			// get first one used for all reqs.
			taskStatusRequest := reqs[0]
			for _, req := range reqs {
				taskStatusInfo := &schedulerx.TaskStatusInfo{
					TaskId: proto.Int64(req.GetTaskId()),
					Status: proto.Int32(req.GetStatus()),
				}
				if req.GetTaskName() != "" {
					taskStatusInfo.TaskName = proto.String(req.GetTaskName())
				}
				if req.GetResult() != "" {
					taskStatusInfo.Result = proto.String(req.GetResult())
				}
				if req.GetProgress() != "" {
					taskStatusInfo.Progress = proto.String(req.GetProgress())
				}
				if req.GetTraceId() != "" {
					taskStatusInfo.TraceId = proto.String(req.GetTraceId())
				}
				taskStatuses = append(taskStatuses, taskStatusInfo)
			}
			req := &schedulerx.ContainerBatchReportTaskStatuesRequest{
				JobId:              taskStatusRequest.JobId,
				JobInstanceId:      taskStatusRequest.JobInstanceId,
				TaskStatues:        taskStatuses,
				TaskMasterAkkaPath: taskStatusRequest.InstanceMasterActorPath,
				WorkerAddr:         taskStatusRequest.WorkerAddr,
				WorkerId:           taskStatusRequest.WorkerId,
				SerialNum:          taskStatusRequest.SerialNum,
			}
			actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
				Msg: req,
			}
		}

		h.activeRunnableNum.Dec()
	})
	if err != nil {
		logger.Errorf("Process ContainerStatusReqHandler failed, submit to batchProcessSvc failed, err=%s", err.Error())
	}
}