internal/master/sharding_task_master.go (181 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package master import ( "context" "encoding/json" "fmt" "regexp" "strconv" "strings" "sync" "github.com/asynkron/protoactor-go/actor" "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/internal/utils" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) var _ taskmaster.MapTaskMaster = &ShardingTaskMaster{} type ShardingTaskMaster struct { *GridTaskMaster actorCtx actor.Context parameters []string shardingTaskStatusMap *sync.Map // Map<Long, ShardingTaskStatus> } func NewShardingTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) *ShardingTaskMaster { return &ShardingTaskMaster{ GridTaskMaster: NewGridTaskMaster(jobInstanceInfo, actorCtx), actorCtx: actorCtx, shardingTaskStatusMap: new(sync.Map), } } func (m *ShardingTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error { if err := m.parseShardingParameters(jobInstanceInfo); err != nil { m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, err.Error()) return err } shardingNum := len(m.parameters) startContainerRequests := make([]*schedulerx.MasterStartContainerRequest, 0, shardingNum) for _, param := range m.parameters { tokens := strings.Split(param, "=") if len(tokens) != 2 { errMsg := fmt.Sprintf("invalid sharding parameters, should be like 0=a,1=b,2=c") m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg) return fmt.Errorf(errMsg) } shardingId, err := strconv.Atoi(tokens[0]) if err != nil { errMsg := fmt.Sprintf("invalid sharding parameters, shardingId is not digit, shardingId=%s", tokens[0]) m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg) return fmt.Errorf(errMsg) } taskName := tokens[0] // taskName == shardingId shardingParameter := tokens[1] if _, ok := m.taskProgressMap.Load(taskName); ok { errMsg := fmt.Sprintf("shardingId=%s is duplicated", taskName) m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg) return fmt.Errorf(errMsg) } task := common.NewShardingTask(int64(shardingId), shardingParameter) taskObj, err := json.Marshal(task) if err != nil { errMsg := fmt.Sprintf("json marshal task failed, err=%s, task=%+v", err.Error(), task) m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg) return fmt.Errorf(errMsg) } req, err := m.convert2StartContainerRequest(jobInstanceInfo, int64(shardingId), taskName, taskObj, false) if err != nil { errMsg := fmt.Sprintf("convert2StartContainerRequest failed, err=%s, jobInstanceInfo=%+v", err.Error(), jobInstanceInfo) m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, errMsg) return fmt.Errorf(errMsg) } req.ShardingNum = proto.Int32(int32(shardingNum)) startContainerRequests = append(startContainerRequests, req) } m.startBatchHandler() m.BatchDispatchTasks(startContainerRequests, "") m.init() return nil } func (m *ShardingTaskMaster) parseShardingParameters(jobInstanceInfo *common.JobInstanceInfo) error { // For sharding tasks in the workflow, the sharding parameters are those configured in the original task. shardingParameters := jobInstanceInfo.GetParameters() if jobInstanceInfo.GetWfInstanceId() == 0 && jobInstanceInfo.GetInstanceParameters() != "" { shardingParameters = jobInstanceInfo.GetInstanceParameters() } if shardingParameters == "" { return fmt.Errorf("sharding parameters is empty") } reg, err := regexp.Compile(",|\n|\r") if err != nil { return err } m.parameters = reg.Split(shardingParameters, -1) return nil } func (m *ShardingTaskMaster) BatchUpdateTaskStatues(requests []*schedulerx.ContainerReportTaskStatusRequest) { m.GridTaskMaster.BatchUpdateTaskStatues(requests) for _, req := range requests { var ( taskId = req.GetTaskId() taskStatus = req.GetStatus() workerAddr = req.GetWorkerAddr() ) if existedShardingTaskStatus, ok := m.shardingTaskStatusMap.Load(taskId); ok { existedShardingTaskStatus.(*taskstatus.ShardingTaskStatus).SetStatus(taskStatus) } else { m.shardingTaskStatusMap.Store(taskId, taskstatus.NewShardingTaskStatus(taskId, workerAddr, taskStatus)) } } } func (m *ShardingTaskMaster) BatchHandlePulledProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest, remoteWorker string) (map[string][]*schedulerx.MasterStartContainerRequest, map[string][]*schedulerx.MasterStartContainerRequest) { var ( worker2ReqsWithNormal = make(map[string][]*schedulerx.MasterStartContainerRequest) worker2ReqsWithFailover = make(map[string][]*schedulerx.MasterStartContainerRequest) ) for _, request := range masterStartContainerRequests { workerIdAddr := remoteWorker if workerIdAddr == "" { workerIdAddr = m.selectWorker() } if workerIdAddr == "" { m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), processor.InstanceStatusFailed, "all worker is down!") break } workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr) if request.GetFailover() { if _, ok := worker2ReqsWithFailover[workerIdAddr]; !ok { worker2ReqsWithFailover[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request} } else { worker2ReqsWithFailover[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request) } } else { if _, ok := worker2ReqsWithNormal[workerIdAddr]; !ok { worker2ReqsWithNormal[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request} } else { worker2ReqsWithNormal[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request) } if counter, ok := m.taskProgressMap.Load(request.GetTaskName()); ok { counter.(*common.TaskProgressCounter).IncrementOnePulled() } } if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok { m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr)) } if val, ok := m.workerProgressMap.Load(workerAddr); ok { val.(*common.WorkerProgressCounter).IncrementTotal() val.(*common.WorkerProgressCounter).IncrementPulled() } m.shardingTaskStatusMap.Store(request.GetTaskId(), taskstatus.NewShardingTaskStatus(request.GetTaskId(), workerAddr, int32(taskstatus.TaskStatusInit))) } return worker2ReqsWithNormal, worker2ReqsWithFailover } func (m *ShardingTaskMaster) GetJobInstanceProgress() (string, error) { shardingTaskStatusList := make([]*taskstatus.ShardingTaskStatus, 0, utils.SyncMapLen(m.shardingTaskStatusMap)) m.shardingTaskStatusMap.Range(func(shardingId, shardingTaskStatus any) bool { shardingTaskStatusList = append(shardingTaskStatusList, shardingTaskStatus.(*taskstatus.ShardingTaskStatus)) return true }) detail := taskstatus.NewShardingTaskProgress(shardingTaskStatusList) data, err := json.Marshal(detail) if err != nil { return "", err } return string(data), nil } func (m *ShardingTaskMaster) CheckProcessor() { // nothing to do } func (m *ShardingTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult { if err := m.taskPersistence.ClearTasks(jobInstanceId); err != nil { return nil } return processor.NewProcessResult(processor.WithSucceed()) } func (m *ShardingTaskMaster) Clear(taskMaster taskmaster.TaskMaster) { m.GridTaskMaster.Clear(taskMaster) if m.shardingTaskStatusMap != nil { m.shardingTaskStatusMap = nil } }