in internal/master/broadcast_task_master.go [238:288]
func (m *BroadcastTaskMaster) UpdateTaskStatus(request *schedulerx.ContainerReportTaskStatusRequest) error {
if request.GetSerialNum() != m.GetSerialNum() {
errMsg := fmt.Sprintf("ignore ContainerReportTaskStatusRequest, current serialNum=%v, but request serialNum=%v.", m.GetSerialNum(), request.GetSerialNum())
return fmt.Errorf(errMsg)
}
var (
jobId = request.GetJobId()
jobInstanceId = request.GetJobInstanceId()
taskId = request.GetTaskId()
workerAddr = request.GetWorkerAddr()
taskStatus = taskstatus.TaskStatus(request.GetStatus())
uniqueId = utils.GetUniqueId(jobId, jobInstanceId, taskId)
)
logger.Infof("update task status serialNum=%v, uniqueId=%v, status=%v, workerAddr=%v", request.GetSerialNum(), uniqueId, taskStatus.Descriptor(), workerAddr)
if val, ok := m.taskStatusMap.Load(uniqueId); ok {
if val.(taskstatus.TaskStatus) == taskStatus {
logger.Warnf("duplicated ContainerReportTaskStatusRequest, uniqueId=%v, taskStatus=%v", uniqueId, taskStatus)
} else {
if taskStatus == taskstatus.TaskStatusSucceed {
// If a machine is finished running, it is directly removed from taskStatusMap.
m.taskStatusMap.Delete(uniqueId)
} else {
// Update status to running
m.taskStatusMap.Store(uniqueId, taskStatus)
}
if _, ok := m.workerProgressMap.Load(workerAddr); !ok {
m.workerProgressMap.Store(workerAddr, common.NewWorkerProgressCounter(workerAddr))
}
counter, _ := m.workerProgressMap.Load(workerAddr)
workerProgressCounter := counter.(*common.WorkerProgressCounter)
switch taskStatus {
case taskstatus.TaskStatusRunning:
workerProgressCounter.IncrementRunning()
case taskstatus.TaskStatusSucceed:
workerProgressCounter.IncrementSuccess()
case taskstatus.TaskStatusFailed:
workerProgressCounter.IncrementOneFailed()
}
// update taskResultMap and taskStatusMap
m.taskIdResultMap.Store(request.GetTaskId(), request.GetResult())
m.taskIdStatusMap.Store(request.GetTaskId(), taskStatus)
m.updateNewInstanceStatus(request.GetSerialNum(), jobInstanceId, request.GetResult())
}
}
return nil
}