func()

in processor/mapjob/map_job_processor.go [136:256]


func (rcvr *MapJobProcessor) Map(jobCtx *jobcontext.JobContext, taskList []interface{}, taskName string) (*processor.ProcessResult, error) {
	var (
		result = processor.NewProcessResult(processor.WithFailed())
	)

	if len(taskList) == 0 {
		result.SetResult("task list is empty")
		return result, nil
	}

	workerAddr := actorcomm.GetRealWorkerAddr(jobCtx.InstanceMasterActorPath())
	mapMasterPid := actorcomm.GetMapMasterPid(workerAddr)
	if mapMasterPid == nil {
		errMsg := fmt.Sprintf("%v%v", "get taskMaster akka path error, path=", jobCtx.InstanceMasterActorPath())
		logger.Errorf(errMsg)
		result.SetResult(errMsg)
		return result, nil
	}

	batchSize := int(config.GetWorkerConfig().WorkerMapPageSize())
	size := len(taskList)
	quotient := size / batchSize
	remainder := size % batchSize
	// map taskList in #batchNumber batch, every batch has no more than 3000 tasks;
	// int batchNumber = remainder > 0 ? quotient + 1 : quotient;
	batchNumber := quotient
	if remainder > 0 {
		batchNumber = quotient + 1
	}
	logger.Infof("map task list, jobInstanceId=%d, taskName=%s, size=%d, batchSize=%d, batchNumber=%d",
		jobCtx.JobInstanceId(), taskName, size, batchSize, batchNumber)

	reqs := make([]*schedulerx.WorkerMapTaskRequest, 0, batchNumber)
	for i := 0; i < batchNumber; i++ {
		reqs = append(reqs, new(schedulerx.WorkerMapTaskRequest))
	}

	position := 0
	maxTaskBodySize := int(config.GetWorkerConfig().TaskBodySizeMax())
	for _, task := range taskList {
		rcvr.checkTaskObject(jobCtx, task)
		batchIdx := position / batchSize
		position++

		taskBody, err := json.Marshal(task)
		if err != nil {
			return nil, fmt.Errorf("json marshal task=%+v failed, err=%s", task, err.Error())
		}
		if len(taskBody) > maxTaskBodySize {
			return nil, fmt.Errorf("taskBody size more than %dB", maxTaskBodySize)
		}
		if reqs[batchIdx].TaskBody == nil {
			reqs[batchIdx].TaskBody = [][]byte{taskBody}
		} else {
			reqs[batchIdx].TaskBody = append(reqs[batchIdx].TaskBody, taskBody)
		}
	}

	position = 0
	for _, req := range reqs {
		req.JobId = proto.Int64(jobCtx.JobId())
		req.JobInstanceId = proto.Int64(jobCtx.JobInstanceId())
		req.TaskId = proto.Int64(jobCtx.TaskId())
		req.TaskName = proto.String(taskName)

		var (
			resp *schedulerx.WorkerMapTaskResponse
			err  error
			ret  interface{}

			retryCount = 0
			ok         = false
		)

		taskMaster := rcvr.taskMasterPool.Get(req.GetJobInstanceId())
		if isMapTaskMaster(taskMaster) {
			// current worker is master worker
			resp, err = rcvr.handleMapTask(jobCtx, taskMaster, req)
		} else {
			// current worker isn't master worker, forward request to master worker
			ret, err = rcvr.actorSystem.Root.RequestFuture(mapMasterPid, req, 30*time.Second).Result()
			if errors.Is(err, actor.ErrTimeout) {
				logger.Warnf("JobInstanceId=%d WorkerMapTaskRequest dispatch failed, due to send request=%+v to taskMaster timeout.", req.GetJobInstanceId(), req)
				for retryCount < maxRetryCount {
					time.Sleep(10 * time.Millisecond)
					ret, err = rcvr.actorSystem.Root.RequestFuture(mapMasterPid, req, 30*time.Second).Result()
					retryCount++
				}
				if err != nil {
					return nil, fmt.Errorf("JobInstanceId=%d WorkerMapTaskRequest dispatch failed, due to send request=%+v to taskMaster timeout after retry exceed %d times, err=%s ", req.GetJobInstanceId(), req, retryCount, err.Error())
				}
			}
			if err == nil {
				resp, ok = ret.(*schedulerx.WorkerMapTaskResponse)
				if !ok {
					err = fmt.Errorf("Response send request=%+v to taskMaster is not WorkerMapTaskResponse, response=%+v ", req, ret)
				}
			}
		}

		if err != nil {
			return nil, fmt.Errorf("JobInstanceId=%d WorkerMapTaskRequest dispatch error, due to send request=%+v to taskMaster failed, err=%s ", req.GetJobInstanceId(), req, err.Error())
		}

		if !resp.GetSuccess() {
			logger.Errorf(resp.GetMessage())
			result.SetResult(resp.GetMessage())
			return result, nil
		}

		reqs[position] = nil
		position++

		if resp.GetOverload() {
			logger.Warnf("Task Master is busy, sleeping a while 10s...")
			time.Sleep(10 * time.Second)
		}
	}
	result.SetSucceed()
	return result, nil
}