in processor/mapjob/map_job_processor.go [79:128]
func (rcvr *MapJobProcessor) handleMapTask(jobCtx *jobcontext.JobContext, taskMaster taskmaster.TaskMaster, request *schedulerx.WorkerMapTaskRequest) (*schedulerx.WorkerMapTaskResponse, error) {
var (
resp *schedulerx.WorkerMapTaskResponse
jobInstanceId = request.GetJobInstanceId()
)
if taskMaster != nil {
if mapTaskMaster, ok := taskMaster.(taskmaster.MapTaskMaster); !ok {
resp = &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String("TaskMaster is not MapTaskMaster"),
}
if err := taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, "TaskMaster is not MapTaskMaster"); err != nil {
errMsg := fmt.Sprintf("jobInstanceId=%d, UpdateNewInstanceStatus failed, err=%s", jobInstanceId, err.Error())
logger.Errorf(errMsg)
return &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String(errMsg),
}, nil
}
} else {
startTime := time.Now()
overload, err := mapTaskMaster.Map(jobCtx, request.GetTaskBody(), request.GetTaskName())
if err != nil {
logger.Errorf("jobInstanceId=%s map failed, err=%s", err.Error())
if err := taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, err.Error()); err != nil {
errMsg := fmt.Sprintf("jobInstanceId=%d, UpdateNewInstanceStatus failed, err=%s", jobInstanceId, err.Error())
logger.Errorf(errMsg)
return &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String(errMsg),
}, nil
}
return nil, err
}
logger.Debugf("jobInstanceId=%d map, cost=%sms", jobInstanceId, time.Since(startTime).Milliseconds())
resp = &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(true),
Overload: proto.Bool(overload),
}
}
} else {
resp = &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String(fmt.Sprintf("can't found TaskMaster by jobInstanceId=%d", jobInstanceId)),
}
}
return resp, nil
}