in internal/actor/task_actor.go [183:223]
func (a *taskActor) handleMapTask(actorCtx actor.Context, req *schedulerx.WorkerMapTaskRequest) error {
var (
jobInstanceId = req.GetJobInstanceId()
response = new(schedulerx.WorkerMapTaskResponse)
)
if taskMaster := a.taskMasterPool.Get(jobInstanceId); taskMaster != nil {
if mapTaskMaster, ok := taskMaster.(taskmaster.MapTaskMaster); ok {
startTime := time.Now().UnixMilli()
overload, err := mapTaskMaster.Map(nil, req.GetTaskBody(), req.GetTaskName())
if err != nil {
errMsg := fmt.Sprintf("handleMapTask failed, due to jobInstanceId=%v map error, err=%v", jobInstanceId, err.Error())
logger.Errorf(errMsg)
taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, errMsg)
return fmt.Errorf(errMsg)
}
logger.Debugf("jobInstanceId=%v map, cost=%vms", jobInstanceId, time.Now().UnixMilli()-startTime)
response = &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(true),
Overload: proto.Bool(overload),
}
} else {
response = &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String("TaskMaster is not MapTaskMaster"),
}
taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, "TaskMaster is not MapTaskMaster")
}
} else {
response = &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String(fmt.Sprintf("can't found TaskMaster by jobInstanceId=%d", jobInstanceId)),
}
}
if senderPid := actorCtx.Sender(); senderPid != nil {
actorCtx.Send(senderPid, response)
} else {
logger.Warnf("Cannot send WorkerMapTaskResponse due to sender is unknown in handleMapTask of taskActor, request=%+v", req)
}
return nil
}