internal/master/task_master.go (338 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package master import ( "context" "encoding/json" "fmt" "sync" "github.com/asynkron/protoactor-go/actor" "go.uber.org/atomic" "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/config" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/discovery" "github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/internal/utils" "github.com/alibaba/schedulerx-worker-go/logger" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) var _ taskmaster.TaskMaster = &TaskMaster{} type TaskMaster struct { actorContext actor.Context `json:"actorContext,omitempty"` instanceStatus processor.InstanceStatus `json:"instanceStatus,omitempty"` // WARNING: not concurrency-safe taskStatusMap sync.Map `json:"taskStatusMap"` // key:string, val:TaskStatus taskIdGenerator *atomic.Int64 `json:"taskIdGenerator"` // WARNING: not concurrency-safe localWorkIdAddr string `json:"localWorkIdAddr,omitempty"` localContainerRouterPath string `json:"localContainerRouterPath,omitempty"` localTaskRouterPath string `json:"localTaskRouterPath,omitempty"` localInstanceRouterPath string `json:"localInstanceRouterPath,omitempty"` jobInstanceInfo *common.JobInstanceInfo `json:"jobInstanceInfo,omitempty"` jobInstanceProgress string `json:"jobInstanceProgress,omitempty"` statusHandler UpdateInstanceStatusHandler `json:"statusHandler,omitempty"` killed bool `json:"killed,omitempty"` // WARNING: not concurrency-safe inited bool `json:"inited,omitempty"` // WARNING: not concurrency-safe aliveCheckWorkerSet *utils.ConcurrentSet `json:"aliveCheckWorkerSet,omitempty"` // string serverDiscovery discovery.ServiceDiscover `json:"serverDiscovery"` serialNum *atomic.Int64 `json:"serialNum,omitempty"` // 秒级任务使用,当前循环次数 existInvalidWorker bool `json:"existInvalidWorker,omitempty"` // 是否存在失效Worker WARNING: not concurrency-safe lock sync.RWMutex } func NewTaskMaster(actorCtx actor.Context, jobInstanceInfo *common.JobInstanceInfo, statusHandler UpdateInstanceStatusHandler) *TaskMaster { workerIdAddr := fmt.Sprintf("%s@%s", utils.GetWorkerId(), actorCtx.ActorSystem().Address()) taskMaster := &TaskMaster{ inited: true, instanceStatus: processor.InstanceStatusRunning, taskStatusMap: sync.Map{}, taskIdGenerator: atomic.NewInt64(-1), aliveCheckWorkerSet: utils.NewConcurrentSet(), jobInstanceInfo: jobInstanceInfo, actorContext: actorCtx, localWorkIdAddr: workerIdAddr, localTaskRouterPath: workerIdAddr, serialNum: atomic.NewInt64(0), statusHandler: statusHandler, } return taskMaster } func (m *TaskMaster) Init() { // m.lock.Lock() // defer m.lock.Unlock() if !m.inited { m.inited = true } } func (m *TaskMaster) GetActorContext() actor.Context { return m.actorContext } func (m *TaskMaster) GetLocalWorkerIdAddr() string { return m.localWorkIdAddr } func (m *TaskMaster) GetLocalJobInstanceRouterPath() string { return m.localInstanceRouterPath } func (m *TaskMaster) GetLocalContainerRouterPath() string { return m.localContainerRouterPath } func (m *TaskMaster) GetLocalTaskRouterPath() string { return m.localTaskRouterPath } func (m *TaskMaster) IsJobInstanceFinished() bool { isFinished := true m.taskStatusMap.Range(func(key, value interface{}) bool { status := value.(taskstatus.TaskStatus) if !status.IsFinished() { isFinished = false return false } return true }) return isFinished } func (m *TaskMaster) UpdateTaskStatus(req *schedulerx.ContainerReportTaskStatusRequest) error { var ( jobId = req.GetJobId() jobInstanceId = req.GetJobInstanceId() taskId = req.GetTaskId() ) taskStatus, ok := taskstatus.Convert2TaskStatus(req.GetStatus()) if !ok { return fmt.Errorf("Invalid taskstatus: %d get from ContainerReportTaskStatusRequest: %+v ", req.GetStatus(), req) } uniqueId := utils.GetUniqueId(jobId, jobInstanceId, taskId) m.taskStatusMap.Store(uniqueId, taskStatus) newStatus := processor.InstanceStatusUnknown if utils.SyncMapLen(&m.taskStatusMap) > 0 { if !m.IsJobInstanceFinished() { newStatus = processor.InstanceStatusRunning } else { newStatus = processor.InstanceStatusSucceed // 只要有一个子任务状态为 Failed,则返回 Failed if newStatus != processor.InstanceStatusFailed { m.taskStatusMap.Range(func(key, val interface{}) bool { if val.(taskstatus.TaskStatus) == taskstatus.TaskStatusFailed { newStatus = processor.InstanceStatusFailed return false } return true }) } } } m.jobInstanceProgress = req.GetProgress() if err := m.updateNewInstanceStatus(req.GetSerialNum(), jobInstanceId, newStatus, req.GetResult()); err != nil { return fmt.Errorf("updateNewInstanceStatus failed, err=%s", err.Error()) } return nil } func (m *TaskMaster) updateNewInstanceStatus(serialNum int64, jobInstanceId int64, newStatus processor.InstanceStatus, result string) error { m.lock.Lock() defer m.lock.Unlock() // fmt.Printf("serialNum=%d, jobInstanceId=%d, status=%s\n", serialNum, jobInstanceId, newStatus) if err := m.statusHandler.Handle(serialNum, newStatus, result); err != nil { return fmt.Errorf("update status failed, err=%s", err.Error()) } return nil } // BatchUpdateTaskStatus // TODO: MapTaskMaster may override this method do really batch process func (m *TaskMaster) BatchUpdateTaskStatus(taskMaster taskmaster.TaskMaster, req *schedulerx.ContainerBatchReportTaskStatuesRequest) error { for _, status := range req.GetTaskStatues() { containerReportTaskStatusReq := &schedulerx.ContainerReportTaskStatusRequest{ JobId: proto.Int64(req.GetJobId()), JobInstanceId: proto.Int64(req.GetJobInstanceId()), TaskId: proto.Int64(status.GetTaskId()), WorkerAddr: proto.String(req.GetWorkerAddr()), WorkerId: proto.String(req.GetWorkerId()), Status: proto.Int32(status.GetStatus()), } if result := status.GetResult(); result != "" { containerReportTaskStatusReq.Result = proto.String(result) } if taskName := status.GetTaskName(); taskName != "" { containerReportTaskStatusReq.TaskName = proto.String(taskName) } if progress := status.GetProgress(); progress != "" { containerReportTaskStatusReq.Progress = proto.String(progress) } if serialNum := req.GetSerialNum(); serialNum != 0 { containerReportTaskStatusReq.SerialNum = proto.Int64(serialNum) } switch taskMaster.(type) { case *BroadcastTaskMaster: if err := taskMaster.(*BroadcastTaskMaster).UpdateTaskStatus(containerReportTaskStatusReq); err != nil { if err != nil { return fmt.Errorf("BroadcastTaskMaster UpdateTaskStatus failed, err=%s ", err.Error()) } } case *BatchTaskMaster: if err := taskMaster.(*BatchTaskMaster).UpdateTaskStatus(containerReportTaskStatusReq); err != nil { if err != nil { return fmt.Errorf("BatchTaskMaster UpdateTaskStatus failed, err=%s ", err.Error()) } } case *GridTaskMaster: if err := taskMaster.(*GridTaskMaster).UpdateTaskStatus(containerReportTaskStatusReq); err != nil { if err != nil { return fmt.Errorf("GridTaskMaster UpdateTaskStatus failed, err=%s ", err.Error()) } } case *MapTaskMaster: if err := taskMaster.(*MapTaskMaster).UpdateTaskStatus(containerReportTaskStatusReq); err != nil { if err != nil { return fmt.Errorf("MapTaskMaster UpdateTaskStatus failed, err=%s ", err.Error()) } } default: if err := m.UpdateTaskStatus(containerReportTaskStatusReq); err != nil { if err != nil { return fmt.Errorf("UpdateTaskStatus failed, err=%s ", err.Error()) } } } } return nil } func (m *TaskMaster) KillInstance(reason string) error { // m.lock.Lock() m.killed = true // m.lock.Unlock() GetTimeScheduler().remove(m.jobInstanceInfo.GetJobInstanceId()) return nil } func (m *TaskMaster) DestroyContainerPool() { // TODO Implement me return } func (m *TaskMaster) KillTask(uniqueId, workerId, workerAddr string) { // TODO Implement me return } func (m *TaskMaster) RetryTasks(taskEntities []schedulerx.RetryTaskEntity) { // TODO Implement me return } func (m *TaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error { // TODO Implement me return nil } func (m *TaskMaster) AcquireTaskId() int64 { return m.taskIdGenerator.Inc() } func (m *TaskMaster) GetJobInstanceProgress() (string, error) { return m.jobInstanceProgress, nil } func (m *TaskMaster) UpdateNewInstanceStatus(serialNum int64, newStatus processor.InstanceStatus, result string) error { return m.updateNewInstanceStatus(serialNum, m.jobInstanceInfo.GetJobInstanceId(), newStatus, result) } func (m *TaskMaster) Stop() { // do nothing } func (m *TaskMaster) Clear(taskMaster taskmaster.TaskMaster) { m.taskStatusMap = sync.Map{} // Clear the sync.Map by reassigning it to an empty sync.Map m.taskIdGenerator.Store(0) m.instanceStatus = processor.InstanceStatusRunning m.aliveCheckWorkerSet.Clear() if !config.GetWorkerConfig().IsShareContainerPool() { taskMaster.DestroyContainerPool() } } func (m *TaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult { return nil } func (m *TaskMaster) GetInstanceStatus() processor.InstanceStatus { // m.lock.RLock() // defer m.lock.RUnlock() return m.instanceStatus } func (m *TaskMaster) SetInstanceStatus(instanceStatus processor.InstanceStatus) { // m.lock.Lock() m.instanceStatus = instanceStatus // m.lock.Unlock() } func (m *TaskMaster) IsKilled() bool { // m.lock.RLock() // defer m.lock.RUnlock() return m.killed } func (m *TaskMaster) GetJobInstanceInfo() *common.JobInstanceInfo { return m.jobInstanceInfo } // GetAliveCheckWorkerSet return set<string> func (m *TaskMaster) GetAliveCheckWorkerSet() *utils.ConcurrentSet { return m.aliveCheckWorkerSet } func (m *TaskMaster) IsInited() bool { // m.lock.RLock() // defer m.lock.RUnlock() return m.inited } func (m *TaskMaster) GetSerialNum() int64 { return m.serialNum.Load() } func (m *TaskMaster) AcquireSerialNum() int64 { return m.serialNum.Inc() } func (m *TaskMaster) ExistInvalidWorker() bool { return m.existInvalidWorker } func (m *TaskMaster) ResetJobInstanceWorkerList() { freeWorkersNum := m.aliveCheckWorkerSet.Len() if freeWorkersNum > 0 { m.jobInstanceInfo.SetAllWorkers(m.aliveCheckWorkerSet.ToStringSlice()) m.existInvalidWorker = false logger.Infof("restJobInstanceWorkerList appGroupId=%d, instanceId=%d, workerSize=%d.", m.jobInstanceInfo.GetAppGroupId(), m.jobInstanceInfo.GetJobInstanceId(), freeWorkersNum) } else { logger.Warnf("restJobInstanceWorkerList update appGroupId=%d, instanceId=%d, workers=0.", m.jobInstanceInfo.GetAppGroupId(), m.jobInstanceInfo.GetJobInstanceId()) } } func (m *TaskMaster) GetCurrentSelection() string { // TODO implement me return "" } func (m *TaskMaster) convert2StartContainerRequest(jobInstanceInfo *common.JobInstanceInfo, taskId int64, taskName string, taskBody []byte, failover bool) (*schedulerx.MasterStartContainerRequest, error) { req := &schedulerx.MasterStartContainerRequest{ JobId: proto.Int64(jobInstanceInfo.GetJobId()), JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()), TaskId: proto.Int64(taskId), User: proto.String(jobInstanceInfo.GetUser()), JobType: proto.String(jobInstanceInfo.GetJobType()), Content: proto.String(jobInstanceInfo.GetContent()), ScheduleTime: proto.Int64(jobInstanceInfo.GetScheduleTime().UnixMilli()), DataTime: proto.Int64(jobInstanceInfo.GetDataTime().UnixMilli()), Parameters: proto.String(jobInstanceInfo.GetParameters()), InstanceParameters: proto.String(jobInstanceInfo.GetInstanceParameters()), GroupId: proto.String(jobInstanceInfo.GetGroupId()), MaxAttempt: proto.Int32(jobInstanceInfo.GetMaxAttempt()), Attempt: proto.Int32(jobInstanceInfo.GetAttempt()), InstanceMasterAkkaPath: proto.String(m.GetLocalTaskRouterPath()), } if upstreamDatas := jobInstanceInfo.GetUpstreamData(); len(upstreamDatas) > 0 { req.UpstreamData = []*schedulerx.UpstreamData{} for _, jobInstanceData := range upstreamDatas { req.UpstreamData = append(req.UpstreamData, &schedulerx.UpstreamData{ JobName: proto.String(jobInstanceData.GetJobName()), Data: proto.String(jobInstanceData.GetData()), }) } } if xattrs := jobInstanceInfo.GetXattrs(); len(xattrs) > 0 { mapTaskXAttrs := common.NewMapTaskXAttrs() if err := json.Unmarshal([]byte(xattrs), mapTaskXAttrs); err != nil { return nil, fmt.Errorf("Json unmarshal to mapTaskXAttrs failed, xattrs=%s, err=%s ", xattrs, err.Error()) } req.ConsumerNum = proto.Int32(mapTaskXAttrs.GetConsumerSize()) req.MaxAttempt = proto.Int32(mapTaskXAttrs.GetTaskMaxAttempt()) req.TaskAttemptInterval = proto.Int32(mapTaskXAttrs.GetTaskAttemptInterval()) } if taskName != "" { req.TaskName = proto.String(taskName) } if len(taskBody) > 0 { req.Task = taskBody } if failover { req.Failover = proto.Bool(true) } if jobInstanceInfo.GetWfInstanceId() >= 0 { req.WfInstanceId = proto.Int64(jobInstanceInfo.GetWfInstanceId()) } req.SerialNum = proto.Int64(m.GetSerialNum()) req.ExecuteMode = proto.String(jobInstanceInfo.GetExecuteMode()) if len(jobInstanceInfo.GetJobName()) > 0 { req.JobName = proto.String(jobInstanceInfo.GetJobName()) } req.TimeType = proto.Int32(jobInstanceInfo.GetTimeType()) req.TimeExpression = proto.String(jobInstanceInfo.GetTimeExpression()) return req, nil } func (m *TaskMaster) RestJobInstanceWorkerList(freeWorkers *utils.Set) { if freeWorkers.Size() > 0 { m.jobInstanceInfo.SetAllWorkers(freeWorkers.ToStringSlice()) m.existInvalidWorker = false logger.Infof("restJobInstanceWorkerList appGroupId=%v instanceId=%v workerSize=%v", m.jobInstanceInfo.GetAppGroupId(), m.jobInstanceInfo.GetJobInstanceId(), freeWorkers.Size()) } else { logger.Warnf("restJobInstanceWorkerList update appGroupId=%v instanceId=%v workers=0.", m.jobInstanceInfo.GetAppGroupId(), m.jobInstanceInfo.GetJobInstanceId()) } }