func()

in internal/master/map_task_master.go [204:260]


func (m *MapTaskMaster) checkInstanceStatus() {
	checkInterval := config.GetWorkerConfig().MapMasterStatusCheckInterval()
	for !m.GetInstanceStatus().IsFinished() {
		time.Sleep(checkInterval)
		newStatus := m.taskPersistence.CheckInstanceStatus(m.GetJobInstanceInfo().GetJobInstanceId())
		if newStatus.IsFinished() && m.taskDispatchReqHandler.IsActive() {
			var (
				failCnt    int64
				successCnt int64
				totalCnt   int64
			)
			m.taskProgressMap.Range(func(key, value any) bool {
				taskProgressCounter := value.(*common.TaskProgressCounter)
				failCnt += taskProgressCounter.GetFailed()
				successCnt += taskProgressCounter.GetSuccess()
				totalCnt += taskProgressCounter.GetTotal()
				return true
			})

			// avoid wrong early finish instance in condition root task was success but sub tasks are still creating.
			time.Sleep(checkInterval)
			continue
		}
		result := m.GetRootTaskResult()
		if newStatus == processor.InstanceStatusSucceed {
			// if return finish status, we need check counter
			var (
				failCnt    int64
				successCnt int64
				totalCnt   int64
			)
			m.taskProgressMap.Range(func(key, value any) bool {
				taskProgressCounter := value.(*common.TaskProgressCounter)
				failCnt += taskProgressCounter.GetFailed()
				successCnt += taskProgressCounter.GetSuccess()
				totalCnt += taskProgressCounter.GetTotal()
				return true
			})
			if successCnt+failCnt < totalCnt {
				newStatus = processor.InstanceStatusFailed
				logger.Warnf("jobInstanceId=%d turn into finish status, but count isn't correct, successCnt:%d, failCnt:%d, totalCnt:%d", m.GetJobInstanceInfo().GetJobInstanceId(), successCnt, failCnt, totalCnt)
				result = fmt.Sprintf("Turn into finish status, but count is wrong, sucCnt:%d, failCnt:%d, totalCnt:%d", successCnt, failCnt, totalCnt)
			} else {
				if failCnt > 0 {
					newStatus = processor.InstanceStatusFailed
				} else {
					newStatus = processor.InstanceStatusSucceed
				}
			}
		}

		if err := m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), newStatus, result); err != nil {
			logger.Errorf("updateNewInstanceStatus failed, serialNum=%v, jobInstanceId=%v, newStatus=%v, result=%v, err=%s",
				m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), newStatus, result, err.Error())
		}
	}
}