internal/master/parallel_task_mater.go (134 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package master
import (
"encoding/json"
"fmt"
"time"
"github.com/asynkron/protoactor-go/actor"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/batch"
"github.com/alibaba/schedulerx-worker-go/internal/common"
"github.com/alibaba/schedulerx-worker-go/internal/constants"
"github.com/alibaba/schedulerx-worker-go/internal/discovery"
"github.com/alibaba/schedulerx-worker-go/internal/master/persistence"
"github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/logger"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
)
var _ taskmaster.ParallelTaskMaster = &ParallelTaskMaster{}
const batchSize = 256
// ParallelTaskMaster using persistence.ServerTaskPersistence
type ParallelTaskMaster struct {
*MapTaskMaster
actorCtx actor.Context
}
func NewParallelTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) *ParallelTaskMaster {
jobInstanceId := jobInstanceInfo.GetJobInstanceId()
parallelTaskMaster := &ParallelTaskMaster{
actorCtx: actorCtx,
MapTaskMaster: NewMapTaskMaster(jobInstanceInfo, actorCtx).(*MapTaskMaster),
}
parallelTaskMaster.taskPersistence = persistence.NewServerTaskPersistence(jobInstanceInfo.GetGroupId())
parallelTaskMaster.taskStatusReqQueue = batch.NewReqQueue(1024)
parallelTaskMaster.taskStatusReqBatchHandler = batch.NewTMStatusReqHandler(jobInstanceId, 1, 1, batchSize*2*int32(len(jobInstanceInfo.GetAllWorkers())), parallelTaskMaster.taskStatusReqQueue)
parallelTaskMaster.taskBlockingQueue = batch.NewReqQueue(batchSize * 4)
if jobInstanceInfo.GetXattrs() != "" {
parallelTaskMaster.xAttrs = new(common.MapTaskXAttrs)
if err := json.Unmarshal([]byte(jobInstanceInfo.GetXattrs()), parallelTaskMaster.xAttrs); err != nil {
logger.Errorf("Unmarshal xAttrs failed, err=%s", err.Error())
}
}
if parallelTaskMaster.xAttrs != nil && parallelTaskMaster.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePull) {
parallelTaskMaster.taskDispatchReqHandler = batch.NewTaskPullReqHandler(jobInstanceId, 1, 2, batchSize*int32(len(jobInstanceInfo.GetAllWorkers())), parallelTaskMaster.taskBlockingQueue)
} else {
curBatchSize := batchSize * len(jobInstanceInfo.GetAllWorkers())
// FIXME implement it
//if(isWorkerLoadRouter()) {
// batchSize = 2 * jobInstanceInfo.getAllWorkers().size();
//}
//Long dispatchDelay = parseDispatchSpeed();
//if (dispatchDelay != null) {
// batchSize = 1;
//}
parallelTaskMaster.taskDispatchReqHandler = batch.NewTaskPushReqHandler(jobInstanceId, 1, 2, int32(curBatchSize), parallelTaskMaster.taskBlockingQueue, batchSize)
}
// Used to support secondary subtask retries to prevent taskId duplication
parallelTaskMaster.taskIdGenerator = atomic.NewInt64(time.Now().Unix())
return parallelTaskMaster
}
func (m *ParallelTaskMaster) Map(jobCtx *jobcontext.JobContext, taskList [][]byte, taskName string) (bool, error) {
if len(taskList) == 0 {
logger.Warnf("map taskList is empty, taskName=%s", taskName)
return false, nil
}
logger.Infof("map taskList, jobInstanceId=%v, taskName=%v, taskList size=%v", m.jobInstanceInfo.GetJobInstanceId(), taskName, len(taskList))
counter := m.taskCounter.Add(int64(len(taskList)))
defaultTaskMaxSize := constants.ParallelTaskListSizeMaxDefault
if discovery.GetGroupManager().IsAdvancedVersion(m.jobInstanceInfo.GetGroupId()) {
defaultTaskMaxSize = constants.ParallelTaskListSizeMaxAdvanced
}
parallelTaskMaxSize := config.GetWorkerConfig().WorkerParallelTaskMaxSize()
if parallelTaskMaxSize == 0 {
parallelTaskMaxSize = int32(defaultTaskMaxSize)
}
if counter > int64(parallelTaskMaxSize) {
return false, fmt.Errorf("jobInstanceId=%v, task counter=%v, task list size beyond %v", m.jobInstanceInfo.GetJobInstanceId(), counter, parallelTaskMaxSize)
}
return m.MapTaskMaster.Map(jobCtx, taskList, taskName)
}
func (m *ParallelTaskMaster) RetryTasks(taskEntities []*schedulerx.RetryTaskEntity) {
// update tasks' status to INIT
taskIdList := make([]int64, 0, len(taskEntities))
for _, taskEntity := range taskEntities {
if utils.IsRootTask(taskEntity.GetTaskName()) {
logger.Warnf("root task can't retry")
} else {
taskIdList = append(taskIdList, taskEntity.GetTaskId())
}
}
req := &schedulerx.WorkerReportTaskListStatusRequest{
JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()),
TaskId: taskIdList,
Status: proto.Int32(int32(taskstatus.TaskStatusInit)),
}
// Send to server by master
actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
// Wait 30 seconds
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()
select {
case resp := <-actorcomm.WorkerBatchUpdateTaskStatusRespMsgSender():
if resp.GetSuccess() {
if !m.IsInited() {
// If it has not been initialized, re-initialize it.
m.startBatchHandler()
m.init()
for _, taskEntity := range taskEntities {
taskName := taskEntity.GetTaskName()
if counter, ok := m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName)); ok {
counter.(*common.TaskProgressCounter).IncrementOneTotal()
}
}
} else {
for _, taskEntity := range taskEntities {
taskName := taskEntity.GetTaskName()
workerAddr := taskEntity.GetWorkerAddr()
oldStatus := taskEntity.GetOldStatus()
if taskProgressCounter, ok := m.taskProgressMap.Load(taskName); ok {
switch taskstatus.TaskStatus(oldStatus) {
case taskstatus.TaskStatusSucceed:
taskProgressCounter.(*common.TaskProgressCounter).DecrementSuccess()
case taskstatus.TaskStatusFailed:
taskProgressCounter.(*common.TaskProgressCounter).DecrementFailed()
}
}
if workerProgressCounter, ok := m.workerProgressMap.Load(workerAddr); ok {
switch taskstatus.TaskStatus(oldStatus) {
case taskstatus.TaskStatusSucceed:
workerProgressCounter.(*common.WorkerProgressCounter).DecrementSuccess()
case taskstatus.TaskStatusFailed:
workerProgressCounter.(*common.WorkerProgressCounter).DecrementFailed()
}
}
}
}
} else {
logger.Errorf("RetryTasks in ParallelTaskMaster timeout, jobInstanceId=%d, errMsg=%s", m.jobInstanceInfo.GetJobInstanceId(), resp.GetMessage())
//TODO 发送失败应该尝试另一个server
}
case <-timer.C:
logger.Errorf("RetryTasks in ParallelTaskMaster timeout, jobInstanceId=%d", m.jobInstanceInfo.GetJobInstanceId())
}
}