func()

in internal/master/broadcast_task_master.go [238:288]


func (m *BroadcastTaskMaster) UpdateTaskStatus(request *schedulerx.ContainerReportTaskStatusRequest) error {
	if request.GetSerialNum() != m.GetSerialNum() {
		errMsg := fmt.Sprintf("ignore ContainerReportTaskStatusRequest, current serialNum=%v, but request serialNum=%v.", m.GetSerialNum(), request.GetSerialNum())
		return fmt.Errorf(errMsg)
	}
	var (
		jobId         = request.GetJobId()
		jobInstanceId = request.GetJobInstanceId()
		taskId        = request.GetTaskId()
		workerAddr    = request.GetWorkerAddr()

		taskStatus = taskstatus.TaskStatus(request.GetStatus())
		uniqueId   = utils.GetUniqueId(jobId, jobInstanceId, taskId)
	)
	logger.Infof("update task status serialNum=%v, uniqueId=%v, status=%v, workerAddr=%v", request.GetSerialNum(), uniqueId, taskStatus.Descriptor(), workerAddr)
	if val, ok := m.taskStatusMap.Load(uniqueId); ok {
		if val.(taskstatus.TaskStatus) == taskStatus {
			logger.Warnf("duplicated ContainerReportTaskStatusRequest, uniqueId=%v, taskStatus=%v", uniqueId, taskStatus)
		} else {
			if taskStatus == taskstatus.TaskStatusSucceed {
				// If a machine is finished running, it is directly removed from taskStatusMap.
				m.taskStatusMap.Delete(uniqueId)
			} else {
				// Update status to running
				m.taskStatusMap.Store(uniqueId, taskStatus)
			}
			if _, ok := m.workerProgressMap.Load(workerAddr); !ok {
				m.workerProgressMap.Store(workerAddr, common.NewWorkerProgressCounter(workerAddr))
			}

			counter, _ := m.workerProgressMap.Load(workerAddr)
			workerProgressCounter := counter.(*common.WorkerProgressCounter)
			switch taskStatus {
			case taskstatus.TaskStatusRunning:
				workerProgressCounter.IncrementRunning()
			case taskstatus.TaskStatusSucceed:
				workerProgressCounter.IncrementSuccess()
			case taskstatus.TaskStatusFailed:
				workerProgressCounter.IncrementOneFailed()
			}

			// update taskResultMap and taskStatusMap
			m.taskIdResultMap.Store(request.GetTaskId(), request.GetResult())
			m.taskIdStatusMap.Store(request.GetTaskId(), taskStatus)

			m.updateNewInstanceStatus(request.GetSerialNum(), jobInstanceId, request.GetResult())
		}
	}

	return nil
}