func()

in processor/mapjob/map_job_processor.go [79:128]


func (rcvr *MapJobProcessor) handleMapTask(jobCtx *jobcontext.JobContext, taskMaster taskmaster.TaskMaster, request *schedulerx.WorkerMapTaskRequest) (*schedulerx.WorkerMapTaskResponse, error) {
	var (
		resp          *schedulerx.WorkerMapTaskResponse
		jobInstanceId = request.GetJobInstanceId()
	)

	if taskMaster != nil {
		if mapTaskMaster, ok := taskMaster.(taskmaster.MapTaskMaster); !ok {
			resp = &schedulerx.WorkerMapTaskResponse{
				Success: proto.Bool(false),
				Message: proto.String("TaskMaster is not MapTaskMaster"),
			}
			if err := taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, "TaskMaster is not MapTaskMaster"); err != nil {
				errMsg := fmt.Sprintf("jobInstanceId=%d, UpdateNewInstanceStatus failed, err=%s", jobInstanceId, err.Error())
				logger.Errorf(errMsg)
				return &schedulerx.WorkerMapTaskResponse{
					Success: proto.Bool(false),
					Message: proto.String(errMsg),
				}, nil
			}
		} else {
			startTime := time.Now()
			overload, err := mapTaskMaster.Map(jobCtx, request.GetTaskBody(), request.GetTaskName())
			if err != nil {
				logger.Errorf("jobInstanceId=%s map failed, err=%s", err.Error())
				if err := taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, err.Error()); err != nil {
					errMsg := fmt.Sprintf("jobInstanceId=%d, UpdateNewInstanceStatus failed, err=%s", jobInstanceId, err.Error())
					logger.Errorf(errMsg)
					return &schedulerx.WorkerMapTaskResponse{
						Success: proto.Bool(false),
						Message: proto.String(errMsg),
					}, nil
				}
				return nil, err
			}
			logger.Debugf("jobInstanceId=%d map, cost=%sms", jobInstanceId, time.Since(startTime).Milliseconds())
			resp = &schedulerx.WorkerMapTaskResponse{
				Success:  proto.Bool(true),
				Overload: proto.Bool(overload),
			}
		}
	} else {
		resp = &schedulerx.WorkerMapTaskResponse{
			Success: proto.Bool(false),
			Message: proto.String(fmt.Sprintf("can't found TaskMaster by jobInstanceId=%d", jobInstanceId)),
		}
	}

	return resp, nil
}