internal/master/standalone_task_master.go (168 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package master import ( "context" "fmt" "strings" "time" "github.com/asynkron/protoactor-go/actor" "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/config" "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster" "github.com/alibaba/schedulerx-worker-go/internal/masterpool" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/internal/remoting/pool" "github.com/alibaba/schedulerx-worker-go/internal/tasks" "github.com/alibaba/schedulerx-worker-go/internal/utils" "github.com/alibaba/schedulerx-worker-go/logger" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) var _ taskmaster.TaskMaster = &StandaloneTaskMaster{} type StandaloneTaskMaster struct { *TaskMaster currentSelection string // workerName actorCtx actor.Context tasks *tasks.TaskMap connpool pool.ConnPool taskMasterPoolCleaner func(int64) } func NewStandaloneTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) taskmaster.TaskMaster { var ( connpool = pool.GetConnPool() taskMasterPool = masterpool.GetTaskMasterPool() taskMasterPoolCleaner = func(jobInstanceId int64) { if taskMaster := taskMasterPool.Get(jobInstanceId); taskMaster != nil { taskMaster.Stop() taskMasterPool.Remove(jobInstanceId) } } ) standaloneTaskMaster := &StandaloneTaskMaster{ actorCtx: actorCtx, taskMasterPoolCleaner: taskMasterPoolCleaner, tasks: taskMasterPool.Tasks(), connpool: connpool, currentSelection: actorCtx.Self().Address, } statusHandler := NewCommonUpdateInstanceStatusHandler(actorCtx, standaloneTaskMaster, jobInstanceInfo) if utils.IsSecondTypeJob(common.TimeType(jobInstanceInfo.GetTimeType())) { statusHandler = NewSecondJobUpdateInstanceStatusHandler(actorCtx, standaloneTaskMaster, jobInstanceInfo) } standaloneTaskMaster.TaskMaster = NewTaskMaster(actorCtx, jobInstanceInfo, statusHandler) return standaloneTaskMaster } func (m *StandaloneTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error { var ( err error uniqueId string taskId int64 workerId = utils.GetWorkerId() workerAddr = m.GetCurrentSelection() ) defer func() { if err != nil { logger.Errorf("Standalone taskMaster submitInstance failed, workerAddr=%s, uniqueId=%s, err=%s", workerAddr, uniqueId, err.Error()) m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusFailed) failedReq := &schedulerx.ContainerReportTaskStatusRequest{ JobId: proto.Int64(jobInstanceInfo.GetJobId()), JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()), TaskId: proto.Int64(taskId), Status: proto.Int32(int32(taskstatus.TaskStatusFailed)), WorkerId: proto.String(workerId), WorkerAddr: proto.String(workerAddr), SerialNum: proto.Int64(m.serialNum.Load()), } m.UpdateTaskStatus(failedReq) } }() taskId = m.AcquireTaskId() uniqueId = utils.GetUniqueId(jobInstanceInfo.GetJobId(), jobInstanceInfo.GetJobInstanceId(), taskId) req, err := m.convert2StartContainerRequest(jobInstanceInfo, taskId, "", nil, false) if err != nil { logger.Errorf("SubmitInstance failed, jobInstanceInfo=%+v, err=%s.", jobInstanceInfo, err.Error()) m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusFailed) return err } // If task execution distribution is turned on for second-level tasks, the execution machine will be selected in polling. if config.GetWorkerConfig().IsDispatchSecondDelayStandalone() && common.TimeType(jobInstanceInfo.GetTimeType()) == common.TimeTypeSecondDelay { workerIdAddr := m.selectWorker() workerInfo := strings.Split(workerIdAddr, "@") workerId = workerInfo[0] workerAddr = actorcomm.GetRealWorkerAddr(workerIdAddr) m.currentSelection = workerAddr } response, e := m.actorContext.RequestFuture(actorcomm.GetContainerRouterPid(m.currentSelection), req, 10*time.Second).Result() if e != nil { err = fmt.Errorf("request to containerPid failed, err=%s", e.Error()) return err } resp, ok := response.(*schedulerx.MasterStartContainerResponse) if !ok { m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusFailed) err = fmt.Errorf("response is not MasterStartContainerResponse, resp=%+v", response) return err } if resp.GetSuccess() { m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusInit) logger.Infof("Standalone taskMaster init worker succeed, workerAddr=%s, uniqueId=%s", workerAddr, uniqueId) return nil } err = fmt.Errorf("start container request failed: %s", resp.GetMessage()) return err } // Poll to get the executable machine func (m *StandaloneTaskMaster) selectWorker() string { workers := m.GetJobInstanceInfo().GetAllWorkers() workersCnt := len(workers) idx := 0 if workersCnt == 0 { return "" } else if serialNum := m.GetSerialNum(); serialNum > int64(workersCnt) { idx = int(serialNum % int64(workersCnt)) } return workers[idx] } func (m *StandaloneTaskMaster) KillInstance(reason string) error { uniqueId := utils.GetUniqueIdWithoutTaskId(m.jobInstanceInfo.GetJobId(), m.jobInstanceInfo.GetJobInstanceId()) req := &schedulerx.MasterKillContainerRequest{ JobId: proto.Int64(m.jobInstanceInfo.GetJobId()), JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()), MayInterruptIfRunning: proto.Bool(false), } response, err := m.actorContext.RequestFuture(actorcomm.GetContainerRouterPid(m.currentSelection), req, 10*time.Second).Result() if err != nil { return fmt.Errorf("send kill instance request exception, workerAddr=%v, uninqueId=%v, err=%s", m.currentSelection, uniqueId, err.Error()) } resp, ok := response.(*schedulerx.MasterKillContainerResponse) if !ok { return fmt.Errorf("response is not MasterKillContainerResponse, resp=%+v", response) } if resp.GetSuccess() { logger.Infof("Standalone taskMaster kill instance succeed, workerAddr=%s, uniqueId=%s", m.currentSelection, uniqueId) return nil } if err = m.updateNewInstanceStatus(m.GetSerialNum(), m.jobInstanceInfo.GetJobInstanceId(), processor.InstanceStatusFailed, reason); err != nil { return fmt.Errorf("UpdateNewInstanceStatus failed, err=%s", err.Error()) } if !m.instanceStatus.IsFinished() { m.lock.Lock() m.instanceStatus = processor.InstanceStatusFailed m.lock.Unlock() } return nil } func (m *StandaloneTaskMaster) DestroyContainerPool() { req := &schedulerx.MasterDestroyContainerPoolRequest{ JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()), SerialNum: proto.Int64(m.GetSerialNum()), } if err := m.actorContext.RequestFuture(actorcomm.GetContainerRouterPid(m.currentSelection), req, 5*time.Second).Wait(); err != nil { logger.Errorf("Destroy containerPool failed, err: %s", err.Error()) } } func (m *StandaloneTaskMaster) CheckProcessor() error { // TODO Implement me return nil } func (m *StandaloneTaskMaster) GetCurrentSelection() string { return m.currentSelection }