processor/mapjob/map_job_processor.go (212 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mapjob
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/asynkron/protoactor-go/actor"
"google.golang.org/protobuf/proto"
"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/constants"
"github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster"
"github.com/alibaba/schedulerx-worker-go/internal/masterpool"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
"github.com/alibaba/schedulerx-worker-go/logger"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"github.com/alibaba/schedulerx-worker-go/processor/mapjob/bizsubtask"
)
var _ processor.MapJobProcessor = &MapJobProcessor{}
const maxRetryCount = 3
type MapJobProcessor struct {
processor.Processor
taskMasterPool *masterpool.TaskMasterPool
actorSystem *actor.ActorSystem
}
func NewMapJobProcessor() *MapJobProcessor {
return &MapJobProcessor{
taskMasterPool: masterpool.GetTaskMasterPool(),
actorSystem: actorcomm.GetActorSystem(),
}
}
// checkTaskObject check subtask object information
func (rcvr *MapJobProcessor) checkTaskObject(jobCtx *jobcontext.JobContext, taskObject interface{}) error {
// FIXME
isAdvancedVersion := false
// context := ContainerFactory.getContainerPool().getContext()
// isAdvancedVersion := GroupManager.INSTANCE.isAdvancedVersion(context.getGroupId())
if bizSubTask, ok := taskObject.(bizsubtask.BizSubTask); isAdvancedVersion && ok {
labelMap := bizSubTask.LabelMap()
if len(labelMap) > 3 {
return fmt.Errorf("label map size can't beyond 3")
}
for key, val := range labelMap {
if len(key) > 60 || len(val) > 180 {
logger.Errorf("Job instance=%d label map<%s, %s> content can't beyond max size(60,180).", jobCtx.JobInstanceId(), key, val)
return fmt.Errorf("label map content can't beyond max size(60,180)")
}
}
}
return nil
}
func (rcvr *MapJobProcessor) handleMapTask(jobCtx *jobcontext.JobContext, taskMaster taskmaster.TaskMaster, request *schedulerx.WorkerMapTaskRequest) (*schedulerx.WorkerMapTaskResponse, error) {
var (
resp *schedulerx.WorkerMapTaskResponse
jobInstanceId = request.GetJobInstanceId()
)
if taskMaster != nil {
if mapTaskMaster, ok := taskMaster.(taskmaster.MapTaskMaster); !ok {
resp = &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String("TaskMaster is not MapTaskMaster"),
}
if err := taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, "TaskMaster is not MapTaskMaster"); err != nil {
errMsg := fmt.Sprintf("jobInstanceId=%d, UpdateNewInstanceStatus failed, err=%s", jobInstanceId, err.Error())
logger.Errorf(errMsg)
return &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String(errMsg),
}, nil
}
} else {
startTime := time.Now()
overload, err := mapTaskMaster.Map(jobCtx, request.GetTaskBody(), request.GetTaskName())
if err != nil {
logger.Errorf("jobInstanceId=%s map failed, err=%s", err.Error())
if err := taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, err.Error()); err != nil {
errMsg := fmt.Sprintf("jobInstanceId=%d, UpdateNewInstanceStatus failed, err=%s", jobInstanceId, err.Error())
logger.Errorf(errMsg)
return &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String(errMsg),
}, nil
}
return nil, err
}
logger.Debugf("jobInstanceId=%d map, cost=%sms", jobInstanceId, time.Since(startTime).Milliseconds())
resp = &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(true),
Overload: proto.Bool(overload),
}
}
} else {
resp = &schedulerx.WorkerMapTaskResponse{
Success: proto.Bool(false),
Message: proto.String(fmt.Sprintf("can't found TaskMaster by jobInstanceId=%d", jobInstanceId)),
}
}
return resp, nil
}
func (rcvr *MapJobProcessor) IsRootTask(jobCtx *jobcontext.JobContext) bool {
return jobCtx.TaskName() == constants.MapTaskRootName
}
// Map distribute tasks to all workers.
// Every element in taskList shouldn't beyond 64KB.
func (rcvr *MapJobProcessor) Map(jobCtx *jobcontext.JobContext, taskList []interface{}, taskName string) (*processor.ProcessResult, error) {
var (
result = processor.NewProcessResult(processor.WithFailed())
)
if len(taskList) == 0 {
result.SetResult("task list is empty")
return result, nil
}
workerAddr := actorcomm.GetRealWorkerAddr(jobCtx.InstanceMasterActorPath())
mapMasterPid := actorcomm.GetMapMasterPid(workerAddr)
if mapMasterPid == nil {
errMsg := fmt.Sprintf("%v%v", "get taskMaster akka path error, path=", jobCtx.InstanceMasterActorPath())
logger.Errorf(errMsg)
result.SetResult(errMsg)
return result, nil
}
batchSize := int(config.GetWorkerConfig().WorkerMapPageSize())
size := len(taskList)
quotient := size / batchSize
remainder := size % batchSize
// map taskList in #batchNumber batch, every batch has no more than 3000 tasks;
// int batchNumber = remainder > 0 ? quotient + 1 : quotient;
batchNumber := quotient
if remainder > 0 {
batchNumber = quotient + 1
}
logger.Infof("map task list, jobInstanceId=%d, taskName=%s, size=%d, batchSize=%d, batchNumber=%d",
jobCtx.JobInstanceId(), taskName, size, batchSize, batchNumber)
reqs := make([]*schedulerx.WorkerMapTaskRequest, 0, batchNumber)
for i := 0; i < batchNumber; i++ {
reqs = append(reqs, new(schedulerx.WorkerMapTaskRequest))
}
position := 0
maxTaskBodySize := int(config.GetWorkerConfig().TaskBodySizeMax())
for _, task := range taskList {
rcvr.checkTaskObject(jobCtx, task)
batchIdx := position / batchSize
position++
taskBody, err := json.Marshal(task)
if err != nil {
return nil, fmt.Errorf("json marshal task=%+v failed, err=%s", task, err.Error())
}
if len(taskBody) > maxTaskBodySize {
return nil, fmt.Errorf("taskBody size more than %dB", maxTaskBodySize)
}
if reqs[batchIdx].TaskBody == nil {
reqs[batchIdx].TaskBody = [][]byte{taskBody}
} else {
reqs[batchIdx].TaskBody = append(reqs[batchIdx].TaskBody, taskBody)
}
}
position = 0
for _, req := range reqs {
req.JobId = proto.Int64(jobCtx.JobId())
req.JobInstanceId = proto.Int64(jobCtx.JobInstanceId())
req.TaskId = proto.Int64(jobCtx.TaskId())
req.TaskName = proto.String(taskName)
var (
resp *schedulerx.WorkerMapTaskResponse
err error
ret interface{}
retryCount = 0
ok = false
)
taskMaster := rcvr.taskMasterPool.Get(req.GetJobInstanceId())
if isMapTaskMaster(taskMaster) {
// current worker is master worker
resp, err = rcvr.handleMapTask(jobCtx, taskMaster, req)
} else {
// current worker isn't master worker, forward request to master worker
ret, err = rcvr.actorSystem.Root.RequestFuture(mapMasterPid, req, 30*time.Second).Result()
if errors.Is(err, actor.ErrTimeout) {
logger.Warnf("JobInstanceId=%d WorkerMapTaskRequest dispatch failed, due to send request=%+v to taskMaster timeout.", req.GetJobInstanceId(), req)
for retryCount < maxRetryCount {
time.Sleep(10 * time.Millisecond)
ret, err = rcvr.actorSystem.Root.RequestFuture(mapMasterPid, req, 30*time.Second).Result()
retryCount++
}
if err != nil {
return nil, fmt.Errorf("JobInstanceId=%d WorkerMapTaskRequest dispatch failed, due to send request=%+v to taskMaster timeout after retry exceed %d times, err=%s ", req.GetJobInstanceId(), req, retryCount, err.Error())
}
}
if err == nil {
resp, ok = ret.(*schedulerx.WorkerMapTaskResponse)
if !ok {
err = fmt.Errorf("Response send request=%+v to taskMaster is not WorkerMapTaskResponse, response=%+v ", req, ret)
}
}
}
if err != nil {
return nil, fmt.Errorf("JobInstanceId=%d WorkerMapTaskRequest dispatch error, due to send request=%+v to taskMaster failed, err=%s ", req.GetJobInstanceId(), req, err.Error())
}
if !resp.GetSuccess() {
logger.Errorf(resp.GetMessage())
result.SetResult(resp.GetMessage())
return result, nil
}
reqs[position] = nil
position++
if resp.GetOverload() {
logger.Warnf("Task Master is busy, sleeping a while 10s...")
time.Sleep(10 * time.Second)
}
}
result.SetSucceed()
return result, nil
}
func (rcvr *MapJobProcessor) Kill(jobCtx *jobcontext.JobContext) error {
// TODO implement me
panic("implement me")
}
func isMapTaskMaster(taskMaster taskmaster.TaskMaster) bool {
if taskMaster == nil {
return false
}
_, ok := taskMaster.(taskmaster.MapTaskMaster)
return ok
}