in internal/master/map_task_master.go [878:947]
func (m *MapTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult {
var reduceResult *processor.ProcessResult
jobCtx := new(jobcontext.JobContext)
jobCtx.SetJobId(m.GetJobInstanceInfo().GetJobId())
jobCtx.SetJobInstanceId(jobInstanceId)
jobCtx.SetJobType(m.GetJobInstanceInfo().GetJobType())
jobCtx.SetContent(m.GetJobInstanceInfo().GetContent())
jobCtx.SetScheduleTime(m.GetJobInstanceInfo().GetScheduleTime())
jobCtx.SetDataTime(m.GetJobInstanceInfo().GetDataTime())
jobCtx.SetJobParameters(m.GetJobInstanceInfo().GetParameters())
jobCtx.SetInstanceParameters(m.GetJobInstanceInfo().GetInstanceParameters())
jobCtx.SetUser(m.GetJobInstanceInfo().GetUser())
jobCtx.SetTaskResults(m.taskResultMap)
jobCtx.SetTaskStatuses(m.taskStatusMap)
jobName := gjson.Get(jobCtx.Content(), "jobName").String()
// Compatible with the existing Java language configuration mechanism
if jobCtx.JobType() == "java" {
jobName = gjson.Get(jobCtx.Content(), "className").String()
}
jobProcessor, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName)
if !ok {
reduceResult = processor.NewProcessResult()
reduceResult.SetFailed()
reduceResult.SetResult(fmt.Sprintf("job=%s can not cast to MapReduceJobProcessor, must implement MapReduceJobProcessor interface to support reduce operations", jobName))
return reduceResult
}
if mpProcessor, ok := jobProcessor.(processor.MapReduceJobProcessor); ok {
runReduceIfFail := mpProcessor.RunReduceIfFail(jobCtx)
if m.GetInstanceStatus() == processor.InstanceStatusFailed && !runReduceIfFail {
logger.Warnf("jobInstanceId=%d is failed, skip reduce", jobInstanceId)
return nil
}
reduceTaskName := constants.ReduceTaskName
taskProgressCounter, _ := m.taskProgressMap.LoadOrStore(reduceTaskName, common.NewTaskProgressCounter(reduceTaskName))
taskProgressCounter.(*common.TaskProgressCounter).IncrementOneTotal()
taskProgressCounter.(*common.TaskProgressCounter).IncrementRunning()
workerAddr := m.actorCtx.ActorSystem().Address()
workerProgressCounter, _ := m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
workerProgressCounter.(*common.WorkerProgressCounter).IncrementTotal()
workerProgressCounter.(*common.WorkerProgressCounter).IncrementRunning()
result, err := mpProcessor.Reduce(jobCtx)
if err != nil {
result = processor.NewProcessResult()
result.SetFailed()
result.SetResult("reduce exception: " + err.Error())
}
if result.Status() == processor.InstanceStatusSucceed {
if val, ok := m.taskProgressMap.Load(reduceTaskName); ok {
val.(*common.TaskProgressCounter).IncrementOneSuccess()
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementSuccess()
}
} else {
if val, ok := m.taskProgressMap.Load(reduceTaskName); ok {
val.(*common.TaskProgressCounter).IncrementOneFailed()
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementOneFailed()
}
}
return result
}
return reduceResult
}