internal/actor/job_instance_actor.go (337 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package actor import ( "context" "fmt" "time" "github.com/tidwall/gjson" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/asynkron/protoactor-go/actor" "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/master" "github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster" "github.com/alibaba/schedulerx-worker-go/internal/masterpool" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/internal/remoting/pool" "github.com/alibaba/schedulerx-worker-go/internal/utils" "github.com/alibaba/schedulerx-worker-go/logger" ) var _ actor.Actor = &jobInstanceActor{} type jobInstanceActor struct { connpool pool.ConnPool taskmasterPool *masterpool.TaskMasterPool } func newJobInstanceActor(actorSystem *actor.ActorSystem) *jobInstanceActor { jActor := &jobInstanceActor{ connpool: pool.GetConnPool(), taskmasterPool: masterpool.GetTaskMasterPool(), } resolver := func(pid *actor.PID) (actor.Process, bool) { if actorcomm.IsSchedulerxServer(pid) { return newJobInstanceProcessor(jActor.connpool), true } // If communicate with actors other than server, then use the default handler (return false) return nil, false } actorSystem.ProcessRegistry.RegisterAddressResolver(resolver) return jActor } func (a *jobInstanceActor) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *schedulerx.WorkerBatchReportTaskStatuesResponse: // send to atLeastOnceDeliveryRoutingActor // actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ // Msg: msg, // } // FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s a.handleReportWorkerStatus(ctx, ctx.Message()) case *schedulerx.WorkerReportJobInstanceStatusResponse: // send to atLeastOnceDeliveryRoutingActor // actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ // Msg: msg, // } // FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s a.handleReportWorkerStatus(ctx, ctx.Message()) case *actorcomm.SchedulerWrappedMsg: switch innerMsg := msg.Msg.(type) { case *schedulerx.ServerSubmitJobInstanceRequest: if err := a.handleSubmitJobInstance(ctx, msg); err != nil { logger.Errorf("handleSubmitJobInstanceRequest failed, err=%s", err.Error()) } case *schedulerx.ServerKillJobInstanceRequest: if err := a.handleKillJobInstance(ctx, msg); err != nil { logger.Errorf("handleKillJobInstanceRequest failed, err=%s", err.Error()) } case *schedulerx.ServerRetryTasksRequest: a.handleRetryTasks(ctx, msg) case *schedulerx.ServerKillTaskRequest: a.handleKillTask(ctx, msg) case *schedulerx.ServerCheckTaskMasterRequest: a.handCheckTaskMaster(ctx, msg) case *schedulerx.MasterNotifyWorkerPullRequest: a.handleInitPull(ctx, msg) case *schedulerx.WorkerReportJobInstanceStatusRequest: // forward to server ctx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), msg) case *schedulerx.WorkerReportJobInstanceStatusResponse, *schedulerx.WorkerBatchReportTaskStatuesResponse: // send to atLeastOnceDeliveryRoutingActor actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ Msg: innerMsg, SenderPath: msg.SenderPath, } default: logger.Errorf("Receive unknown message in jobInstanceActor, msg=%+v", msg) } } } func (a *jobInstanceActor) handleSubmitJobInstance(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) error { var ( req = msg.Msg.(*schedulerx.ServerSubmitJobInstanceRequest) taskMasterPool = masterpool.GetTaskMasterPool() ) logger.Infof("handleSubmitJobInstance, jobInstanceId=%d, req=%+v", req.GetJobInstanceId(), req) if taskMasterPool.Contains(req.GetJobInstanceId()) { errMsg := fmt.Sprintf("jobInstanceId=%d is still running!", req.GetJobInstanceId()) logger.Infof(errMsg) resp := &schedulerx.ServerSubmitJobInstanceResponse{ Success: proto.Bool(false), Message: proto.String(errMsg), } actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) } else { resp := &schedulerx.ServerSubmitJobInstanceResponse{ Success: proto.Bool(true), } actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) jobInstanceInfo := convert2JobInstanceInfo(req) // check job is registered jobName := gjson.Get(jobInstanceInfo.GetContent(), "jobName").String() // Compatible with the existing Java language configuration mechanism if jobInstanceInfo.GetJobType() == "java" { jobName = gjson.Get(jobInstanceInfo.GetContent(), "className").String() } task, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName) if !ok || task == nil { logger.Errorf("handleSubmitJobInstance error, jobName=%s is unregistered. ", jobName) // report job instance status with at-least-once-delivery req := &schedulerx.WorkerReportJobInstanceStatusRequest{ JobId: proto.Int64(jobInstanceInfo.GetJobId()), JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()), Status: proto.Int32(int32(processor.InstanceStatusFailed)), DeliveryId: proto.Int64(utils.GetDeliveryId()), GroupId: proto.String(jobInstanceInfo.GetGroupId()), Result: proto.String(fmt.Sprintf("jobName=%s is unregistered", jobName)), } actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ Msg: req, } } else { var taskMaster taskmaster.TaskMaster switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) { case common.StandaloneExecuteMode: taskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx) case common.BroadcastExecuteMode: taskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx) case common.BatchExecuteMode: taskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx) case common.ParallelExecuteMode: taskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx) case common.GridExecuteMode: taskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx) case common.ShardingExecuteMode: taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx) default: logger.Errorf("Submit jobInstanceId=%d failed, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode()) } if taskMaster != nil { masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), taskMaster) if err := taskMaster.SubmitInstance(msg.Ctx, jobInstanceInfo); err != nil { return err } logger.Infof("Submit jobInstanceId=%d succeed", req.GetJobInstanceId()) } } } return nil } func convert2JobInstanceInfo(req *schedulerx.ServerSubmitJobInstanceRequest) *common.JobInstanceInfo { jobInstanceInfo := new(common.JobInstanceInfo) jobInstanceInfo.SetJobId(req.GetJobId()) jobInstanceInfo.SetJobInstanceId(req.GetJobInstanceId()) jobInstanceInfo.SetExecuteMode(req.GetExecuteMode()) jobInstanceInfo.SetJobType(req.GetJobType()) jobInstanceInfo.SetContent(req.GetContent()) jobInstanceInfo.SetUser(req.GetUser()) jobInstanceInfo.SetScheduleTime(time.UnixMilli(req.GetScheduleTime())) jobInstanceInfo.SetDataTime(time.UnixMilli(req.GetDataTime())) jobInstanceInfo.SetAllWorkers(utils.ShuffleStringSlice(req.GetWorkers())) jobInstanceInfo.SetJobConcurrency(req.GetJobConcurrency()) jobInstanceInfo.SetRegionId(req.GetRegionId()) jobInstanceInfo.SetAppGroupId(req.GetAppGroupId()) jobInstanceInfo.SetTimeType(req.GetTimeType()) jobInstanceInfo.SetTimeExpression(req.GetTimeExpression()) jobInstanceInfo.SetGroupId(req.GetGroupId()) jobInstanceInfo.SetTriggerType(req.GetTriggerType()) jobInstanceInfo.SetParameters(req.GetParameters()) jobInstanceInfo.SetXattrs(req.GetXattrs()) jobInstanceInfo.SetInstanceParameters(req.GetInstanceParameters()) jobInstanceInfo.SetUpstreamData(convert2JobInstanceData(req.GetUpstreamData())) jobInstanceInfo.SetMaxAttempt(req.GetMaxAttempt()) jobInstanceInfo.SetAttempt(req.GetAttempt()) jobInstanceInfo.SetWfInstanceId(req.GetWfInstanceId()) jobInstanceInfo.SetJobName(req.GetJobName()) return jobInstanceInfo } func convert2JobInstanceData(datas []*schedulerx.UpstreamData) []*common.JobInstanceData { var ret []*common.JobInstanceData for _, data := range datas { tmp := new(common.JobInstanceData) tmp.SetData(data.GetData()) tmp.SetJobName(data.GetJobName()) ret = append(ret, tmp) } return ret } func (a *jobInstanceActor) handleKillJobInstance(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) error { var ( taskMasterPool = masterpool.GetTaskMasterPool() req = msg.Msg.(*schedulerx.ServerKillJobInstanceRequest) ) logger.Infof("handleKillJobInstance, jobInstanceId=%d ", req.GetJobInstanceId()) if !taskMasterPool.Contains(req.GetJobInstanceId()) { errMsg := fmt.Sprintf("%d is not exist", req.GetJobInstanceId()) logger.Infof(errMsg) resp := &schedulerx.ServerKillJobInstanceResponse{ Success: proto.Bool(true), Message: proto.String(errMsg), } actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) } else { if taskMaster := masterpool.GetTaskMasterPool().Get(req.GetJobInstanceId()); taskMaster != nil { if err := taskMaster.KillInstance("killed from server"); err != nil { logger.Infof("%d killed from server failed, err=%s", req.GetJobInstanceId(), err.Error()) } } errMsg := fmt.Sprintf("%d killed from server", req.GetJobInstanceId()) logger.Infof(errMsg) resp := &schedulerx.ServerKillJobInstanceResponse{ Success: proto.Bool(false), // FIXME true or false Message: proto.String(errMsg), } actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) logger.Infof("Kill jobInstanceId=%d succeed", req.GetJobInstanceId()) } return nil } func (a *jobInstanceActor) handleRetryTasks(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) { req := msg.Msg.(*schedulerx.ServerRetryTasksRequest) logger.Infof("handleRetryTasks, jobInstanceId=%d", req.GetJobInstanceId()) var ( jobInstanceInfo = convertServerRetryTasksRequest2JobInstanceInfo(req) ) if taskMaster := a.taskmasterPool.Get(jobInstanceInfo.GetJobInstanceId()); taskMaster != nil { if parallelTaskMaster, ok := taskMaster.(taskmaster.ParallelTaskMaster); ok { parallelTaskMaster.RetryTasks(req.GetRetryTaskEntity()) resp := &schedulerx.ServerRetryTasksResponse{ Success: proto.Bool(true), } actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) } } else { var newTaskMaster taskmaster.TaskMaster switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) { case common.StandaloneExecuteMode: newTaskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx) case common.BroadcastExecuteMode: newTaskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx) case common.BatchExecuteMode: newTaskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx) case common.ParallelExecuteMode: newTaskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx) case common.GridExecuteMode: newTaskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx) case common.ShardingExecuteMode: taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx) default: logger.Errorf("handleRetryTasks failed, jobInstanceId=%d, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode()) } if newTaskMaster != nil { masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), newTaskMaster) } } } func convertServerRetryTasksRequest2JobInstanceInfo(req *schedulerx.ServerRetryTasksRequest) *common.JobInstanceInfo { jobInstanceInfo := new(common.JobInstanceInfo) jobInstanceInfo.SetJobId(req.GetJobId()) jobInstanceInfo.SetJobInstanceId(req.GetJobInstanceId()) jobInstanceInfo.SetExecuteMode(req.GetExecuteMode()) jobInstanceInfo.SetJobType(req.GetJobType()) jobInstanceInfo.SetContent(req.GetContent()) jobInstanceInfo.SetUser(req.GetUser()) jobInstanceInfo.SetScheduleTime(time.UnixMilli(req.GetScheduleTime())) jobInstanceInfo.SetDataTime(time.UnixMilli(req.GetDataTime())) jobInstanceInfo.SetAllWorkers(utils.ShuffleStringSlice(req.GetWorkers())) jobInstanceInfo.SetJobConcurrency(req.GetJobConcurrency()) jobInstanceInfo.SetRegionId(req.GetRegionId()) jobInstanceInfo.SetAppGroupId(req.GetAppGroupId()) jobInstanceInfo.SetTimeType(req.GetTimeType()) jobInstanceInfo.SetTimeExpression(req.GetTimeExpression()) jobInstanceInfo.SetGroupId(req.GetGroupId()) jobInstanceInfo.SetParameters(req.GetParameters()) jobInstanceInfo.SetXattrs(req.GetXattrs()) jobInstanceInfo.SetInstanceParameters(req.GetInstanceParameters()) jobInstanceInfo.SetUpstreamData(convert2JobInstanceData(req.GetUpstreamData())) jobInstanceInfo.SetMaxAttempt(req.GetMaxAttempt()) jobInstanceInfo.SetAttempt(req.GetAttempt()) jobInstanceInfo.SetWfInstanceId(req.GetWfInstanceId()) jobInstanceInfo.SetJobName(req.GetJobName()) return jobInstanceInfo } func (a *jobInstanceActor) handleKillTask(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) { req := msg.Msg.(*schedulerx.ServerKillTaskRequest) logger.Infof("handleKillTask, jobInstanceId=%d", req.GetJobInstanceId()) var ( resp = new(schedulerx.ServerKillTaskResponse) jobInstanceId = req.GetJobInstanceId() ) if !a.taskmasterPool.Contains(jobInstanceId) { resp = &schedulerx.ServerKillTaskResponse{ Success: proto.Bool(false), Message: proto.String(fmt.Sprintf("jobInstanceId=%d is not existed", jobInstanceId)), } } else { uniqueId := utils.GetUniqueId(req.GetJobId(), req.GetJobInstanceId(), req.GetTaskId()) if mapMaster, ok := a.taskmasterPool.Get(jobInstanceId).(taskmaster.MapTaskMaster); ok { mapMaster.KillTask(uniqueId, req.GetWorkerId(), req.GetWorkerAddr()) resp = &schedulerx.ServerKillTaskResponse{ Success: proto.Bool(true), } } else { logger.Warnf("taskmaster get form taskmasterPool is not mapTaskMaster, jobInstanceId=%d, taskmaster=%+v", jobInstanceId, mapMaster) } } actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) } func (a *jobInstanceActor) handCheckTaskMaster(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) { req := msg.Msg.(*schedulerx.ServerCheckTaskMasterRequest) logger.Infof("handCheckTaskMaster, jobInstanceId=%d", req.GetJobInstanceId()) var ( resp = new(schedulerx.ServerCheckTaskMasterResponse) jobInstanceId = req.GetJobInstanceId() ) if !a.taskmasterPool.Contains(jobInstanceId) { resp = &schedulerx.ServerCheckTaskMasterResponse{ Success: proto.Bool(false), Message: proto.String(fmt.Sprintf("TaskMaster is not existed of jobInstance=%d", jobInstanceId)), } } else { resp = &schedulerx.ServerCheckTaskMasterResponse{ Success: proto.Bool(true), } } actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) } func (a *jobInstanceActor) handleInitPull(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) { req := msg.Msg.(*schedulerx.MasterNotifyWorkerPullRequest) logger.Infof("handleInitPull, jobInstanceId=%d", req.GetJobInstanceId()) // Pull 模式将要废弃,所以这里不处理直接返回 resp := &schedulerx.MasterNotifyWorkerPullResponse{Success: proto.Bool(true)} actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) } func (a *jobInstanceActor) handleReportWorkerStatus(actorCtx actor.Context, msg interface{}) { go func() { for i := 0; i < 3; i++ { f := actorCtx.RequestFuture(actorcomm.SchedulerxServerPid(context.Background()), msg, 5*time.Second) if err := f.Wait(); err != nil { logger.Warnf("handleReportWorkerStatus failed, retry times=%d, err=%s", i, err.Error()) time.Sleep(30 * time.Second) continue } break } }() }