in internal/master/map_task_master.go [204:260]
func (m *MapTaskMaster) checkInstanceStatus() {
checkInterval := config.GetWorkerConfig().MapMasterStatusCheckInterval()
for !m.GetInstanceStatus().IsFinished() {
time.Sleep(checkInterval)
newStatus := m.taskPersistence.CheckInstanceStatus(m.GetJobInstanceInfo().GetJobInstanceId())
if newStatus.IsFinished() && m.taskDispatchReqHandler.IsActive() {
var (
failCnt int64
successCnt int64
totalCnt int64
)
m.taskProgressMap.Range(func(key, value any) bool {
taskProgressCounter := value.(*common.TaskProgressCounter)
failCnt += taskProgressCounter.GetFailed()
successCnt += taskProgressCounter.GetSuccess()
totalCnt += taskProgressCounter.GetTotal()
return true
})
// avoid wrong early finish instance in condition root task was success but sub tasks are still creating.
time.Sleep(checkInterval)
continue
}
result := m.GetRootTaskResult()
if newStatus == processor.InstanceStatusSucceed {
// if return finish status, we need check counter
var (
failCnt int64
successCnt int64
totalCnt int64
)
m.taskProgressMap.Range(func(key, value any) bool {
taskProgressCounter := value.(*common.TaskProgressCounter)
failCnt += taskProgressCounter.GetFailed()
successCnt += taskProgressCounter.GetSuccess()
totalCnt += taskProgressCounter.GetTotal()
return true
})
if successCnt+failCnt < totalCnt {
newStatus = processor.InstanceStatusFailed
logger.Warnf("jobInstanceId=%d turn into finish status, but count isn't correct, successCnt:%d, failCnt:%d, totalCnt:%d", m.GetJobInstanceInfo().GetJobInstanceId(), successCnt, failCnt, totalCnt)
result = fmt.Sprintf("Turn into finish status, but count is wrong, sucCnt:%d, failCnt:%d, totalCnt:%d", successCnt, failCnt, totalCnt)
} else {
if failCnt > 0 {
newStatus = processor.InstanceStatusFailed
} else {
newStatus = processor.InstanceStatusSucceed
}
}
}
if err := m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), newStatus, result); err != nil {
logger.Errorf("updateNewInstanceStatus failed, serialNum=%v, jobInstanceId=%v, newStatus=%v, result=%v, err=%s",
m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), newStatus, result, err.Error())
}
}
}