func()

in internal/actor/task_actor.go [183:223]


func (a *taskActor) handleMapTask(actorCtx actor.Context, req *schedulerx.WorkerMapTaskRequest) error {
	var (
		jobInstanceId = req.GetJobInstanceId()
		response      = new(schedulerx.WorkerMapTaskResponse)
	)
	if taskMaster := a.taskMasterPool.Get(jobInstanceId); taskMaster != nil {
		if mapTaskMaster, ok := taskMaster.(taskmaster.MapTaskMaster); ok {
			startTime := time.Now().UnixMilli()
			overload, err := mapTaskMaster.Map(nil, req.GetTaskBody(), req.GetTaskName())
			if err != nil {
				errMsg := fmt.Sprintf("handleMapTask failed, due to jobInstanceId=%v map error, err=%v", jobInstanceId, err.Error())
				logger.Errorf(errMsg)
				taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
				return fmt.Errorf(errMsg)
			}
			logger.Debugf("jobInstanceId=%v map, cost=%vms", jobInstanceId, time.Now().UnixMilli()-startTime)
			response = &schedulerx.WorkerMapTaskResponse{
				Success:  proto.Bool(true),
				Overload: proto.Bool(overload),
			}
		} else {
			response = &schedulerx.WorkerMapTaskResponse{
				Success: proto.Bool(false),
				Message: proto.String("TaskMaster is not MapTaskMaster"),
			}
			taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, "TaskMaster is not MapTaskMaster")
		}
	} else {
		response = &schedulerx.WorkerMapTaskResponse{
			Success: proto.Bool(false),
			Message: proto.String(fmt.Sprintf("can't found TaskMaster by jobInstanceId=%d", jobInstanceId)),
		}
	}

	if senderPid := actorCtx.Sender(); senderPid != nil {
		actorCtx.Send(senderPid, response)
	} else {
		logger.Warnf("Cannot send WorkerMapTaskResponse due to sender is unknown in handleMapTask of taskActor, request=%+v", req)
	}
	return nil
}