func()

in internal/master/map_task_master.go [878:947]


func (m *MapTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult {
	var reduceResult *processor.ProcessResult
	jobCtx := new(jobcontext.JobContext)
	jobCtx.SetJobId(m.GetJobInstanceInfo().GetJobId())
	jobCtx.SetJobInstanceId(jobInstanceId)
	jobCtx.SetJobType(m.GetJobInstanceInfo().GetJobType())
	jobCtx.SetContent(m.GetJobInstanceInfo().GetContent())
	jobCtx.SetScheduleTime(m.GetJobInstanceInfo().GetScheduleTime())
	jobCtx.SetDataTime(m.GetJobInstanceInfo().GetDataTime())
	jobCtx.SetJobParameters(m.GetJobInstanceInfo().GetParameters())
	jobCtx.SetInstanceParameters(m.GetJobInstanceInfo().GetInstanceParameters())
	jobCtx.SetUser(m.GetJobInstanceInfo().GetUser())
	jobCtx.SetTaskResults(m.taskResultMap)
	jobCtx.SetTaskStatuses(m.taskStatusMap)

	jobName := gjson.Get(jobCtx.Content(), "jobName").String()
	// Compatible with the existing Java language configuration mechanism
	if jobCtx.JobType() == "java" {
		jobName = gjson.Get(jobCtx.Content(), "className").String()
	}
	jobProcessor, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName)
	if !ok {
		reduceResult = processor.NewProcessResult()
		reduceResult.SetFailed()
		reduceResult.SetResult(fmt.Sprintf("job=%s can not cast to MapReduceJobProcessor, must implement MapReduceJobProcessor interface to support reduce operations", jobName))
		return reduceResult
	}

	if mpProcessor, ok := jobProcessor.(processor.MapReduceJobProcessor); ok {
		runReduceIfFail := mpProcessor.RunReduceIfFail(jobCtx)
		if m.GetInstanceStatus() == processor.InstanceStatusFailed && !runReduceIfFail {
			logger.Warnf("jobInstanceId=%d is failed, skip reduce", jobInstanceId)
			return nil
		}
		reduceTaskName := constants.ReduceTaskName
		taskProgressCounter, _ := m.taskProgressMap.LoadOrStore(reduceTaskName, common.NewTaskProgressCounter(reduceTaskName))
		taskProgressCounter.(*common.TaskProgressCounter).IncrementOneTotal()
		taskProgressCounter.(*common.TaskProgressCounter).IncrementRunning()

		workerAddr := m.actorCtx.ActorSystem().Address()
		workerProgressCounter, _ := m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
		workerProgressCounter.(*common.WorkerProgressCounter).IncrementTotal()
		workerProgressCounter.(*common.WorkerProgressCounter).IncrementRunning()

		result, err := mpProcessor.Reduce(jobCtx)
		if err != nil {
			result = processor.NewProcessResult()
			result.SetFailed()
			result.SetResult("reduce exception: " + err.Error())
		}

		if result.Status() == processor.InstanceStatusSucceed {
			if val, ok := m.taskProgressMap.Load(reduceTaskName); ok {
				val.(*common.TaskProgressCounter).IncrementOneSuccess()
			}
			if val, ok := m.workerProgressMap.Load(workerAddr); ok {
				val.(*common.WorkerProgressCounter).IncrementSuccess()
			}
		} else {
			if val, ok := m.taskProgressMap.Load(reduceTaskName); ok {
				val.(*common.TaskProgressCounter).IncrementOneFailed()
			}
			if val, ok := m.workerProgressMap.Load(workerAddr); ok {
				val.(*common.WorkerProgressCounter).IncrementOneFailed()
			}
		}
		return result
	}
	return reduceResult
}