func()

in internal/master/map_task_master.go [169:202]


func (m *MapTaskMaster) pullTask(jobIdAndInstanceId string) {
	for !m.GetInstanceStatus().IsFinished() {
		jobInstanceId := m.GetJobInstanceInfo().GetJobInstanceId()
		startTime := time.Now()
		taskInfos, err := m.taskPersistence.Pull(jobInstanceId, m.pageSize)
		if err != nil && errors.Is(err, persistence.ErrTimeout) {
			logger.Errorf("pull task timeout, uniqueId: %s", jobIdAndInstanceId)
			time.Sleep(10 * time.Second)
			continue
		}
		logger.Debugf("jobInstanceId=%d, pull cost=%dms", jobInstanceId, time.Since(startTime).Milliseconds())
		if len(taskInfos) == 0 {
			logger.Debugf("pull task empty of jobInstanceId=%d, sleep 10s ...", jobInstanceId)
			time.Sleep(10 * time.Second)
		} else {
			for _, taskInfo := range taskInfos {
				taskName := taskInfo.TaskName()
				if counter, ok := m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName)); ok {
					counter.(*common.TaskProgressCounter).DecrementRunning()
				}
				req, err := m.convert2StartContainerRequest(m.GetJobInstanceInfo(), taskInfo.TaskId(), taskInfo.TaskName(), taskInfo.TaskBody(), true)
				if err != nil {
					errMsg := fmt.Sprintf("mapTaskMaster pull task failed, jobInstanceInfo=%+v, taskId=%d, taskName=%s, err=%s.", m.GetJobInstanceInfo(), taskInfo.TaskId(), taskInfo.TaskName(), err.Error())
					logger.Errorf(errMsg)
					m.updateNewInstanceStatus(m.GetSerialNum(), jobInstanceId, processor.InstanceStatusFailed, errMsg)
					break // FIXME break or continue?
				}
				if m.taskBlockingQueue != nil {
					m.taskBlockingQueue.SubmitRequest(req)
				}
			}
		}
	}
}