internal/actor/container_actor.go (295 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package actor import ( "context" "encoding/json" "fmt" "runtime/debug" "strings" "sync" "time" "github.com/asynkron/protoactor-go/actor" "github.com/panjf2000/ants/v2" "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/config" "github.com/alibaba/schedulerx-worker-go/internal/batch" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/constants" "github.com/alibaba/schedulerx-worker-go/internal/container" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/internal/utils" "github.com/alibaba/schedulerx-worker-go/logger" "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) var _ actor.Actor = &containerActor{} var defaultActorPool, _ = ants.NewPool( ants.DefaultAntsPoolSize, ants.WithPanicHandler(func(i interface{}) { if r := recover(); r != nil { logger.Errorf("Panic happened in containerStarter, %v\n%s", r, debug.Stack()) } })) type containerActor struct { enableShareContainerPool bool containerPool container.ContainerPool statusReqBatchHandlerPool *batch.ContainerStatusReqHandlerPool batchSize int32 containerStarter *ants.Pool lock sync.Mutex } func newContainerActor() *containerActor { return &containerActor{ enableShareContainerPool: config.GetWorkerConfig().IsShareContainerPool(), batchSize: config.GetWorkerConfig().WorkerMapPageSize(), statusReqBatchHandlerPool: batch.GetContainerStatusReqHandlerPool(), containerPool: container.GetThreadContainerPool(), containerStarter: defaultActorPool, lock: sync.Mutex{}, } } func (a *containerActor) Receive(actorCtx actor.Context) { switch msg := actorCtx.Message().(type) { case *schedulerx.MasterStartContainerRequest: a.handleStartContainer(actorCtx, msg) case *schedulerx.MasterBatchStartContainersRequest: a.handleBatchStartContainers(actorCtx, msg) case *schedulerx.MasterKillContainerRequest: a.handleKillContainer(actorCtx, msg) case *schedulerx.MasterDestroyContainerPoolRequest: a.handleDestroyContainerPool(actorCtx, msg) default: logger.Warnf("[containerActor] receive unknown message, msg=%+v", actorCtx.Message()) } } func (a *containerActor) handleStartContainer(actorCtx actor.Context, req *schedulerx.MasterStartContainerRequest) { resp := new(schedulerx.MasterStartContainerResponse) uniqueId, err := a.startContainer(actorCtx, req) if err != nil { logger.Errorf("submit container to containerPool failed, err=%s, uniqueId=%v, cost=%vms", err.Error(), uniqueId, time.Now().UnixMilli()-req.GetScheduleTime()) resp = &schedulerx.MasterStartContainerResponse{ Success: proto.Bool(false), Message: proto.String(err.Error()), } if senderPid := actorCtx.Sender(); senderPid != nil { actorCtx.Send(senderPid, resp) } else { logger.Warnf("Cannot send MasterStartContainerResponse due to sender is unknown in handleStartContainer of containerActor, request=%+v", req) } return } logger.Debugf("submit container to containerPool, uniqueId=%v, cost=%vms", uniqueId, time.Now().UnixMilli()-req.GetScheduleTime()) resp = &schedulerx.MasterStartContainerResponse{Success: proto.Bool(true)} if senderPid := actorCtx.Sender(); senderPid != nil { actorCtx.Send(senderPid, resp) } else { logger.Warnf("Cannot send MasterStartContainerResponse due to sender is unknown in handleStartContainer of containerActor, request=%+v", req) } } func (a *containerActor) handleBatchStartContainers(actorCtx actor.Context, req *schedulerx.MasterBatchStartContainersRequest) { err := a.containerStarter.Submit(func() { for _, startReq := range req.StartReqs { uniqueId, err := a.startContainer(actorCtx, startReq) if err != nil { // report task fail status to task master reportTaskStatusReq := &schedulerx.ContainerReportTaskStatusRequest{ JobId: proto.Int64(startReq.GetJobId()), JobInstanceId: proto.Int64(startReq.GetJobInstanceId()), TaskId: proto.Int64(startReq.GetTaskId()), Status: proto.Int32(int32(taskstatus.TaskStatusFailed)), WorkerId: proto.String(utils.GetWorkerId()), WorkerAddr: proto.String(actorCtx.ActorSystem().Address()), } if startReq.GetTaskName() != "" { reportTaskStatusReq.TaskName = proto.String(startReq.GetTaskName()) } if senderPid := actorCtx.Sender(); senderPid != nil { actorCtx.Send(senderPid, reportTaskStatusReq) } else { logger.Warnf("Cannot send ContainerReportTaskStatusRequest due to sender is unknown in handleBatchStartContainers of containerActor, request=%+v", req) } } logger.Debugf("submit container to containerPool, uniqueId=%v, cost=%vms", uniqueId, time.Now().UnixMilli()-startReq.GetScheduleTime()) } }) resp := new(schedulerx.MasterBatchStartContainersResponse) if err != nil { logger.Errorf("handleBatchStartContainers failed, err=%s", err.Error()) resp = &schedulerx.MasterBatchStartContainersResponse{ Success: proto.Bool(false), Message: proto.String(err.Error()), } } else { resp = &schedulerx.MasterBatchStartContainersResponse{ Success: proto.Bool(true), } } if senderPid := actorCtx.Sender(); senderPid != nil { actorCtx.Request(senderPid, resp) } else { logger.Warnf("Cannot send MasterBatchStartContainersResponse due to sender is unknown in handleBatchStartContainers of containerActor, request=%+v", req) } } func (a *containerActor) convert2ContainerReportTaskStatusRequest(actorCtx actor.Context, req *schedulerx.MasterStartContainerRequest, status taskstatus.TaskStatus, result string) *schedulerx.ContainerReportTaskStatusRequest { return &schedulerx.ContainerReportTaskStatusRequest{ JobId: proto.Int64(req.GetJobId()), JobInstanceId: proto.Int64(req.GetJobInstanceId()), TaskId: proto.Int64(req.GetTaskId()), TaskName: proto.String(req.GetTaskName()), Status: proto.Int32(int32(status)), WorkerAddr: proto.String(actorCtx.ActorSystem().Address()), Result: proto.String(result), WorkerId: proto.String(utils.GetWorkerId()), InstanceMasterActorPath: proto.String(req.GetInstanceMasterAkkaPath()), TimeType: proto.Int32(req.GetTimeType()), SerialNum: proto.Int64(req.GetSerialNum()), } } func (a *containerActor) handleKillContainer(actorCtx actor.Context, req *schedulerx.MasterKillContainerRequest) { var ( jobId = req.GetJobId() jobInstanceId = req.GetJobInstanceId() taskId = req.GetTaskId() uniqueId = utils.GetUniqueId(jobId, jobInstanceId, taskId) ) if req.GetTaskId() != 0 { // kill task container if a.containerPool.Contain(uniqueId) { a.containerPool.Get(uniqueId).Kill() } logger.Infof("kill task container success, uniqueId=%v", uniqueId) } else { uniqueId = utils.GetUniqueIdWithoutTaskId(jobId, jobInstanceId) // kill instance container pool a.killInstance(jobId, jobInstanceId) logger.Infof("kill instance success, uniqueId:%v", uniqueId) } resp := &schedulerx.MasterKillContainerResponse{ Success: proto.Bool(true), } if senderPid := actorCtx.Sender(); senderPid != nil { actorCtx.Request(senderPid, resp) } else { logger.Warnf("Cannot send MasterKillContainerResponse due to sender is unknown in handleKillContainer of containerActor, request=%+v", req) } } func (a *containerActor) handleDestroyContainerPool(actorCtx actor.Context, req *schedulerx.MasterDestroyContainerPoolRequest) { if !a.enableShareContainerPool { // handler, ok := a.statusReqBatchHandlerPool.GetHandlers().Load(req.GetJobInstanceId()) // if ok { a.lock.Lock() defer a.lock.Unlock() logger.Infof("handleDestroyContainerPool from jobInstanceId=%v.", req.GetJobInstanceId()) a.statusReqBatchHandlerPool.Stop(req.GetJobInstanceId()) a.containerPool.DestroyByInstance(req.GetJobInstanceId()) /* if h, ok := handler.(*batch.ContainerStatusReqHandler); ok { if latestRequest := h.GetLatestRequest(); latestRequest != nil { reportTaskStatusRequest, ok := latestRequest.(*schedulerx.ContainerReportTaskStatusRequest) if ok { if reportTaskStatusRequest.GetSerialNum() != req.GetSerialNum() { logger.Infof("skip handleDestroyContainerPool cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum()) return } logger.Infof("handleDestroyContainerPool from cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum()) a.statusReqBatchHandlerPool.Stop(req.GetJobInstanceId()) a.containerPool.DestroyByInstance(req.GetJobInstanceId()) } } } */ // } } if senderPid := actorCtx.Sender(); senderPid != nil { response := &schedulerx.MasterDestroyContainerPoolResponse{ Success: proto.Bool(true), DeliveryId: proto.Int64(req.GetDeliveryId()), } actorCtx.Send(senderPid, response) } else { logger.Warnf("Cannot send MasterDestroyContainerPoolResponse due to sender is unknown in handleDestroyContainerPool of containerActor, request=%+v", req) } // a.containerPool.ReleaseInstanceLock(req.GetJobInstanceId()) } func (a *containerActor) killInstance(jobId, jobInstanceId int64) { containerMap := a.containerPool.GetContainerMap() prefixKey := fmt.Sprintf("%d%s%d", jobId, utils.SplitterToken, jobInstanceId) a.containerPool.GetContainerMap().Range(func(key, value any) bool { var ( uniqueId = key.(string) container = value.(container.Container) ) if strings.HasPrefix(uniqueId, prefixKey) { container.Kill() containerMap.Delete(uniqueId) a.statusReqBatchHandlerPool.Stop(jobInstanceId) } return true }) if !a.enableShareContainerPool { a.containerPool.DestroyByInstance(jobInstanceId) } } func (a *containerActor) startContainer(actorCtx actor.Context, req *schedulerx.MasterStartContainerRequest) (string, error) { uniqueId := utils.GetUniqueId(req.GetJobId(), req.GetJobInstanceId(), req.GetTaskId()) logger.Debugf("startContainer, uniqueId=%v, req=%+v, cost=%vms", uniqueId, req, time.Now().UnixMilli()-req.GetScheduleTime()) jobCtx, err := convertMasterStartContainerRequest2JobContext(req) if err != nil { return "", err } container, err := container.NewThreadContainer(jobCtx, actorCtx, container.GetThreadContainerPool()) if err != nil { return "", err } if container != nil { a.lock.Lock() defer a.lock.Unlock() a.containerPool.Put(uniqueId, container) // Whether to share containerPool. If shared, statusReqBatchHandlerPool has only one handler with key=0. statusReqBatchHandlerKey := int64(0) if !a.enableShareContainerPool { statusReqBatchHandlerKey = req.GetJobInstanceId() } if !a.statusReqBatchHandlerPool.Contains(statusReqBatchHandlerKey) { // support 1.5 million requests reqQueue := batch.NewReqQueue(config.GetWorkerConfig().QueueSize()) a.statusReqBatchHandlerPool.Start( statusReqBatchHandlerKey, batch.NewContainerStatusReqHandler(statusReqBatchHandlerKey, 1, 1, a.batchSize, reqQueue, req.GetInstanceMasterAkkaPath()), ) } consumerNum := int32(constants.ConsumerNumDefault) if req.GetConsumerNum() > 0 { consumerNum = req.GetConsumerNum() } if err = a.containerPool.Submit(req.GetJobId(), req.GetJobInstanceId(), req.GetTaskId(), container, consumerNum); err != nil { return "", err } } else { logger.Warnf("Container is null, uniqueId=%d", uniqueId) } return uniqueId, nil } func convertMasterStartContainerRequest2JobContext(req *schedulerx.MasterStartContainerRequest) (*jobcontext.JobContext, error) { jobCtx := new(jobcontext.JobContext) jobCtx.SetJobId(req.GetJobId()) jobCtx.SetJobInstanceId(req.GetJobInstanceId()) jobCtx.SetTaskId(req.GetTaskId()) jobCtx.SetScheduleTime(time.UnixMilli(req.GetScheduleTime())) jobCtx.SetDataTime(time.UnixMilli(req.GetDataTime())) jobCtx.SetExecuteMode(req.GetExecuteMode()) jobCtx.SetJobType(req.GetJobType()) jobCtx.SetContent(req.GetContent()) jobCtx.SetJobParameters(req.GetParameters()) jobCtx.SetInstanceParameters(req.GetInstanceParameters()) jobCtx.SetUser(req.GetUser()) jobCtx.SetInstanceMasterActorPath(req.GetInstanceMasterAkkaPath()) jobCtx.SetGroupId(req.GetGroupId()) jobCtx.SetMaxAttempt(req.GetMaxAttempt()) jobCtx.SetAttempt(req.GetAttempt()) jobCtx.SetTaskName(req.GetTaskName()) if req.GetTask() != nil { jobCtx.SetTask(req.GetTask()) // If it's a sharding task, set the sharding id and sharding parameters. shardingTask := new(common.ShardingTask) if err := json.Unmarshal(req.GetTask(), shardingTask); err == nil { jobCtx.SetShardingId(shardingTask.GetId()) jobCtx.SetShardingParameter(shardingTask.GetParameter()) } } else { jobCtx.SetShardingId(req.GetTaskId()) } jobCtx.SetTaskMaxAttempt(req.GetTaskMaxAttempt()) jobCtx.SetTaskAttemptInterval(req.GetTaskAttemptInterval()) upstreamData := make([]*common.JobInstanceData, 0, len(req.GetUpstreamData())) for _, data := range req.GetUpstreamData() { jobInstanceData := new(common.JobInstanceData) jobInstanceData.SetJobName(data.GetJobName()) jobInstanceData.SetData(data.GetData()) upstreamData = append(upstreamData, jobInstanceData) } jobCtx.SetUpstreamData(upstreamData) jobCtx.SetWfInstanceId(req.GetWfInstanceId()) jobCtx.SetSerialNum(req.GetSerialNum()) jobCtx.SetJobName(req.GetJobName()) jobCtx.SetShardingNum(req.GetShardingNum()) jobCtx.SetTimeType(req.GetTimeType()) jobCtx.SetTimeExpression(req.GetTimeExpression()) jobCtx.Context = context.Background() return jobCtx, nil }