in internal/master/map_task_master.go [169:202]
func (m *MapTaskMaster) pullTask(jobIdAndInstanceId string) {
for !m.GetInstanceStatus().IsFinished() {
jobInstanceId := m.GetJobInstanceInfo().GetJobInstanceId()
startTime := time.Now()
taskInfos, err := m.taskPersistence.Pull(jobInstanceId, m.pageSize)
if err != nil && errors.Is(err, persistence.ErrTimeout) {
logger.Errorf("pull task timeout, uniqueId: %s", jobIdAndInstanceId)
time.Sleep(10 * time.Second)
continue
}
logger.Debugf("jobInstanceId=%d, pull cost=%dms", jobInstanceId, time.Since(startTime).Milliseconds())
if len(taskInfos) == 0 {
logger.Debugf("pull task empty of jobInstanceId=%d, sleep 10s ...", jobInstanceId)
time.Sleep(10 * time.Second)
} else {
for _, taskInfo := range taskInfos {
taskName := taskInfo.TaskName()
if counter, ok := m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName)); ok {
counter.(*common.TaskProgressCounter).DecrementRunning()
}
req, err := m.convert2StartContainerRequest(m.GetJobInstanceInfo(), taskInfo.TaskId(), taskInfo.TaskName(), taskInfo.TaskBody(), true)
if err != nil {
errMsg := fmt.Sprintf("mapTaskMaster pull task failed, jobInstanceInfo=%+v, taskId=%d, taskName=%s, err=%s.", m.GetJobInstanceInfo(), taskInfo.TaskId(), taskInfo.TaskName(), err.Error())
logger.Errorf(errMsg)
m.updateNewInstanceStatus(m.GetSerialNum(), jobInstanceId, processor.InstanceStatusFailed, errMsg)
break // FIXME break or continue?
}
if m.taskBlockingQueue != nil {
m.taskBlockingQueue.SubmitRequest(req)
}
}
}
}
}