in processor/mapjob/map_job_processor.go [136:256]
func (rcvr *MapJobProcessor) Map(jobCtx *jobcontext.JobContext, taskList []interface{}, taskName string) (*processor.ProcessResult, error) {
var (
result = processor.NewProcessResult(processor.WithFailed())
)
if len(taskList) == 0 {
result.SetResult("task list is empty")
return result, nil
}
workerAddr := actorcomm.GetRealWorkerAddr(jobCtx.InstanceMasterActorPath())
mapMasterPid := actorcomm.GetMapMasterPid(workerAddr)
if mapMasterPid == nil {
errMsg := fmt.Sprintf("%v%v", "get taskMaster akka path error, path=", jobCtx.InstanceMasterActorPath())
logger.Errorf(errMsg)
result.SetResult(errMsg)
return result, nil
}
batchSize := int(config.GetWorkerConfig().WorkerMapPageSize())
size := len(taskList)
quotient := size / batchSize
remainder := size % batchSize
// map taskList in #batchNumber batch, every batch has no more than 3000 tasks;
// int batchNumber = remainder > 0 ? quotient + 1 : quotient;
batchNumber := quotient
if remainder > 0 {
batchNumber = quotient + 1
}
logger.Infof("map task list, jobInstanceId=%d, taskName=%s, size=%d, batchSize=%d, batchNumber=%d",
jobCtx.JobInstanceId(), taskName, size, batchSize, batchNumber)
reqs := make([]*schedulerx.WorkerMapTaskRequest, 0, batchNumber)
for i := 0; i < batchNumber; i++ {
reqs = append(reqs, new(schedulerx.WorkerMapTaskRequest))
}
position := 0
maxTaskBodySize := int(config.GetWorkerConfig().TaskBodySizeMax())
for _, task := range taskList {
rcvr.checkTaskObject(jobCtx, task)
batchIdx := position / batchSize
position++
taskBody, err := json.Marshal(task)
if err != nil {
return nil, fmt.Errorf("json marshal task=%+v failed, err=%s", task, err.Error())
}
if len(taskBody) > maxTaskBodySize {
return nil, fmt.Errorf("taskBody size more than %dB", maxTaskBodySize)
}
if reqs[batchIdx].TaskBody == nil {
reqs[batchIdx].TaskBody = [][]byte{taskBody}
} else {
reqs[batchIdx].TaskBody = append(reqs[batchIdx].TaskBody, taskBody)
}
}
position = 0
for _, req := range reqs {
req.JobId = proto.Int64(jobCtx.JobId())
req.JobInstanceId = proto.Int64(jobCtx.JobInstanceId())
req.TaskId = proto.Int64(jobCtx.TaskId())
req.TaskName = proto.String(taskName)
var (
resp *schedulerx.WorkerMapTaskResponse
err error
ret interface{}
retryCount = 0
ok = false
)
taskMaster := rcvr.taskMasterPool.Get(req.GetJobInstanceId())
if isMapTaskMaster(taskMaster) {
// current worker is master worker
resp, err = rcvr.handleMapTask(jobCtx, taskMaster, req)
} else {
// current worker isn't master worker, forward request to master worker
ret, err = rcvr.actorSystem.Root.RequestFuture(mapMasterPid, req, 30*time.Second).Result()
if errors.Is(err, actor.ErrTimeout) {
logger.Warnf("JobInstanceId=%d WorkerMapTaskRequest dispatch failed, due to send request=%+v to taskMaster timeout.", req.GetJobInstanceId(), req)
for retryCount < maxRetryCount {
time.Sleep(10 * time.Millisecond)
ret, err = rcvr.actorSystem.Root.RequestFuture(mapMasterPid, req, 30*time.Second).Result()
retryCount++
}
if err != nil {
return nil, fmt.Errorf("JobInstanceId=%d WorkerMapTaskRequest dispatch failed, due to send request=%+v to taskMaster timeout after retry exceed %d times, err=%s ", req.GetJobInstanceId(), req, retryCount, err.Error())
}
}
if err == nil {
resp, ok = ret.(*schedulerx.WorkerMapTaskResponse)
if !ok {
err = fmt.Errorf("Response send request=%+v to taskMaster is not WorkerMapTaskResponse, response=%+v ", req, ret)
}
}
}
if err != nil {
return nil, fmt.Errorf("JobInstanceId=%d WorkerMapTaskRequest dispatch error, due to send request=%+v to taskMaster failed, err=%s ", req.GetJobInstanceId(), req, err.Error())
}
if !resp.GetSuccess() {
logger.Errorf(resp.GetMessage())
result.SetResult(resp.GetMessage())
return result, nil
}
reqs[position] = nil
position++
if resp.GetOverload() {
logger.Warnf("Task Master is busy, sleeping a while 10s...")
time.Sleep(10 * time.Second)
}
}
result.SetSucceed()
return result, nil
}