in internal/master/map_task_master.go [410:498]
func (m *MapTaskMaster) BatchUpdateTaskStatues(requests []*schedulerx.ContainerReportTaskStatusRequest) {
finalTaskStatus := make(map[int64]*schedulerx.ContainerReportTaskStatusRequest)
for _, request := range requests {
taskStatus := taskstatus.TaskStatus(request.GetStatus())
// Filter intermediate states
if _, ok := finalTaskStatus[request.GetTaskId()]; !ok || taskStatus.IsFinished() {
finalTaskStatus[request.GetTaskId()] = request
}
var (
workerAddr = request.GetWorkerAddr()
taskName = request.GetTaskName()
)
logger.Debugf("report task status:%s from worker:%s, uniqueId:%s", taskStatus.Descriptor(), workerAddr,
utils.GetUniqueId(request.GetJobId(), request.GetJobInstanceId(), request.GetTaskId()))
m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName))
if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
}
switch taskStatus {
case taskstatus.TaskStatusRunning:
if val, ok := m.taskProgressMap.Load(taskName); ok {
val.(*common.TaskProgressCounter).IncrementRunning()
}
if workerAddr != "" {
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementRunning()
}
}
case taskstatus.TaskStatusSucceed:
if val, ok := m.taskProgressMap.Load(taskName); ok {
val.(*common.TaskProgressCounter).IncrementOneSuccess()
}
if workerAddr != "" {
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementSuccess()
}
}
case taskstatus.TaskStatusFailed:
if val, ok := m.taskProgressMap.Load(taskName); ok {
val.(*common.TaskProgressCounter).IncrementOneFailed()
}
if workerAddr != "" {
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementOneFailed()
}
}
}
// update taskResultMap and taskStatusMap
m.taskResultMap[request.GetTaskId()] = request.GetResult()
m.taskStatusMap[request.GetTaskId()] = taskStatus
}
startTime := time.Now()
// Return the reason for the failure of the root task node.
// It is possible to return two root task requests at the same time, which are running and failed statuses, take the last one.
idx := len(requests) - 1
if idx >= 0 && requests[idx].GetStatus() == int32(taskstatus.TaskStatusFailed) && requests[idx].GetTaskName() == constants.MapTaskRootName {
m.SetRootTaskResult(requests[idx].GetResult())
}
updateSuccess := false
allTaskStatus := make([]*schedulerx.ContainerReportTaskStatusRequest, 0, len(finalTaskStatus))
for _, value := range finalTaskStatus {
allTaskStatus = append(allTaskStatus, value)
}
for i := 0; i < 3; i++ {
// try 3 times
// FIXME if need 3 times?
if err := m.taskPersistence.UpdateTaskStatues(allTaskStatus); err != nil {
logger.Errorf("jobInstanceId=%d, persistent batch updateTaskStatus error, err=%s", m.jobInstanceInfo.GetJobInstanceId(), err.Error())
continue
}
updateSuccess = true
break
}
if !updateSuccess {
err := m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, "persistent batch update TaskStatus error up to 3 times")
if err != nil {
logger.Errorf("jobInstanceId=%d, UpdateNewInstanceStatus failed, err=%s", m.GetJobInstanceInfo().GetJobInstanceId(), err.Error())
}
}
logger.Debugf("jobInstanceId=%d batch update status db cost %dms", m.GetJobInstanceInfo().GetJobInstanceId(), time.Since(startTime).Milliseconds())
}