internal/actor/task_actor.go (211 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package actor import ( "context" "fmt" "time" "github.com/asynkron/protoactor-go/actor" "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster" "github.com/alibaba/schedulerx-worker-go/internal/masterpool" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/internal/remoting/pool" "github.com/alibaba/schedulerx-worker-go/logger" "github.com/alibaba/schedulerx-worker-go/processor" ) var _ actor.Actor = &taskActor{} // taskActor is the type taskActor struct { connpool pool.ConnPool taskMasterPool *masterpool.TaskMasterPool } func newTaskActor(actorSystem *actor.ActorSystem) *taskActor { tActor := &taskActor{ connpool: pool.GetConnPool(), taskMasterPool: masterpool.GetTaskMasterPool(), } resolver := func(pid *actor.PID) (actor.Process, bool) { if actorcomm.IsSchedulerxServer(pid) { return newTaskProcessor(tActor.connpool), true } // If communicate with actors other than server, then use the default handler (return false) return nil, false } actorSystem.ProcessRegistry.RegisterAddressResolver(resolver) return tActor } func (a *taskActor) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *schedulerx.ContainerReportTaskStatusRequest: if err := a.handleTaskStatus(msg); err != nil { logger.Errorf("handleTaskStatus failed, err=%s", err.Error()) } case *schedulerx.ContainerBatchReportTaskStatuesRequest: // send to atLeastOnceDeliveryRoutingActor a.handleBatchTaskStatues(ctx, msg) case *schedulerx.WorkerMapTaskRequest: if err := a.handleMapTask(ctx, msg); err != nil { logger.Errorf("handleMapTask failed, err=%s", err.Error()) } case *schedulerx.WorkerMapTaskResponse: actorcomm.WorkerMapTaskRespMsgSender() <- msg case *schedulerx.PullTaskFromMasterRequest: a.handlePullTasks(ctx, msg) case *actorcomm.SchedulerWrappedMsg: switch innerMsg := msg.Msg.(type) { case *schedulerx.ContainerBatchReportTaskStatuesRequest: // send to atLeastOnceDeliveryRoutingActor a.handleBatchTaskStatues(ctx, innerMsg) case *schedulerx.WorkerMapTaskResponse: actorcomm.WorkerMapTaskRespMsgSender() <- innerMsg case *schedulerx.PullTaskFromMasterRequest: a.handlePullTasks(ctx, innerMsg) case *schedulerx.WorkerReportJobInstanceProgressRequest: // forward to server ctx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), msg) case *schedulerx.WorkerBatchUpdateTaskStatusRequest: // forward to server serverPid := actorcomm.SchedulerxServerPid(context.Background()) result, err := ctx.RequestFuture(serverPid, msg, 5*time.Second).Result() if err != nil { logger.Errorf("Send WorkerBatchUpdateTaskStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address) } else { actorcomm.WorkerBatchUpdateTaskStatusRespMsgSender() <- result.(*schedulerx.WorkerBatchUpdateTaskStatusResponse) } case *schedulerx.WorkerQueryJobInstanceStatusRequest: // forward to server serverPid := actorcomm.SchedulerxServerPid(context.Background()) result, err := ctx.RequestFuture(serverPid, msg, 30*time.Second).Result() if err != nil { logger.Errorf("Send WorkerQueryJobInstanceStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address) } else { actorcomm.WorkerQueryJobInstanceStatusRespMsgSender() <- result.(*schedulerx.WorkerQueryJobInstanceStatusResponse) } case *schedulerx.WorkerClearTasksRequest: // forward to server serverPid := actorcomm.SchedulerxServerPid(context.Background()) result, err := ctx.RequestFuture(serverPid, msg, 5*time.Second).Result() if err != nil { logger.Errorf("Send WorkerClearTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address) } else { actorcomm.WorkerClearTasksRespMsgSender() <- result.(*schedulerx.WorkerClearTasksResponse) } case *schedulerx.WorkerBatchCreateTasksRequest: // forward to server serverPid := actorcomm.SchedulerxServerPid(context.Background()) result, err := ctx.RequestFuture(serverPid, msg, 90*time.Second).Result() if err != nil { logger.Errorf("Send WorkerBatchCreateTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address) } else { actorcomm.WorkerBatchCreateTasksRespMsgSender() <- result.(*schedulerx.WorkerBatchCreateTasksResponse) } case *schedulerx.WorkerPullTasksRequest: // forward to server serverPid := actorcomm.SchedulerxServerPid(context.Background()) result, err := ctx.RequestFuture(serverPid, msg, 30*time.Second).Result() if err != nil { logger.Errorf("Send WorkerPullTasksRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address) } else { actorcomm.WorkerPullTasksRespMsgSender() <- result.(*schedulerx.WorkerPullTasksResponse) } case *schedulerx.WorkerBatchReportTaskStatuesRequest: // forward to server serverPid := actorcomm.SchedulerxServerPid(context.Background()) ctx.Send(serverPid, msg) case *schedulerx.WorkerReportTaskListStatusRequest: serverPid := actorcomm.SchedulerxServerPid(context.Background()) result, err := ctx.RequestFuture(serverPid, innerMsg, 30*time.Second).Result() if err != nil { logger.Errorf("Send WorkerReportTaskListStatusRequest timeout, jobInstanceId=%d, serverAddr=%s", innerMsg.JobInstanceId, serverPid.Address) } else { actorcomm.WorkerReportTaskListStatusRespMsgSender() <- result.(*schedulerx.WorkerReportTaskListStatusResponse) } default: logger.Errorf("Receive unknown message in taskActor, msg=%+v", msg) } } } func (a *taskActor) handleTaskStatus(req *schedulerx.ContainerReportTaskStatusRequest) error { if taskMaster := a.taskMasterPool.Get(req.GetJobInstanceId()); taskMaster != nil { if err := taskMaster.UpdateTaskStatus(req); err != nil { return fmt.Errorf("jobInstanceId=%v, taskId=%v", req.GetJobInstanceId(), req.GetTaskId()) } } return nil } func (a *taskActor) handleBatchTaskStatues(actorCtx actor.Context, req *schedulerx.ContainerBatchReportTaskStatuesRequest) { logger.Debugf("[taskActor] handleBatchTaskStatues, jobInstanceId=%v, batch receive task status reqs, size:%v, taskMasterPool=%v", req.GetJobInstanceId(), len(req.GetTaskStatues()), masterpool.GetTaskMasterPool()) if taskMaster := masterpool.GetTaskMasterPool().Get(req.GetJobInstanceId()); taskMaster != nil { if err := taskMaster.BatchUpdateTaskStatus(taskMaster, req); err != nil { logger.Warnf("[taskActor] TaskMaster BatchUpdateTaskStatus failed in handleBatchTaskStatues, req=%+v, err=%s", req, err.Error()) } } response := &schedulerx.ContainerBatchReportTaskStatuesResponse{ Success: proto.Bool(true), DeliveryId: proto.Int64(req.GetDeliveryId()), } if senderPid := actorCtx.Sender(); senderPid != nil { actorCtx.Request(senderPid, response) } else { logger.Warnf("Cannot send ContainerBatchReportTaskStatuesResponse due to sender is unknown in handleBatchTaskStatues of taskActor, request=%+v", req) } } func (a *taskActor) handleMapTask(actorCtx actor.Context, req *schedulerx.WorkerMapTaskRequest) error { var ( jobInstanceId = req.GetJobInstanceId() response = new(schedulerx.WorkerMapTaskResponse) ) if taskMaster := a.taskMasterPool.Get(jobInstanceId); taskMaster != nil { if mapTaskMaster, ok := taskMaster.(taskmaster.MapTaskMaster); ok { startTime := time.Now().UnixMilli() overload, err := mapTaskMaster.Map(nil, req.GetTaskBody(), req.GetTaskName()) if err != nil { errMsg := fmt.Sprintf("handleMapTask failed, due to jobInstanceId=%v map error, err=%v", jobInstanceId, err.Error()) logger.Errorf(errMsg) taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, errMsg) return fmt.Errorf(errMsg) } logger.Debugf("jobInstanceId=%v map, cost=%vms", jobInstanceId, time.Now().UnixMilli()-startTime) response = &schedulerx.WorkerMapTaskResponse{ Success: proto.Bool(true), Overload: proto.Bool(overload), } } else { response = &schedulerx.WorkerMapTaskResponse{ Success: proto.Bool(false), Message: proto.String("TaskMaster is not MapTaskMaster"), } taskMaster.UpdateNewInstanceStatus(taskMaster.GetSerialNum(), processor.InstanceStatusFailed, "TaskMaster is not MapTaskMaster") } } else { response = &schedulerx.WorkerMapTaskResponse{ Success: proto.Bool(false), Message: proto.String(fmt.Sprintf("can't found TaskMaster by jobInstanceId=%d", jobInstanceId)), } } if senderPid := actorCtx.Sender(); senderPid != nil { actorCtx.Send(senderPid, response) } else { logger.Warnf("Cannot send WorkerMapTaskResponse due to sender is unknown in handleMapTask of taskActor, request=%+v", req) } return nil } func (a *taskActor) handlePullTasks(actorCtx actor.Context, req *schedulerx.PullTaskFromMasterRequest) { var ( jobInstanceId = req.GetJobInstanceId() response = new(schedulerx.PullTaskFromMasterResponse) ) if taskMaster := a.taskMasterPool.Get(jobInstanceId); taskMaster != nil { if mapTaskMaster, ok := taskMaster.(taskmaster.MapTaskMaster); ok { response = &schedulerx.PullTaskFromMasterResponse{ Success: proto.Bool(true), Request: mapTaskMaster.SyncPullTasks(req.GetPageSize(), req.GetWorkerIdAddr()), } } else { response = &schedulerx.PullTaskFromMasterResponse{ Success: proto.Bool(false), Message: proto.String(fmt.Sprintf("%v%v", "TaskMaster is not MapTaskMaster, jobInstanceId=", jobInstanceId)), } } } else { response = &schedulerx.PullTaskFromMasterResponse{ Success: proto.Bool(false), Message: proto.String(fmt.Sprintf("%v%v", "TaskMaster is null, jobInstanceId=", jobInstanceId)), } } if senderPid := actorCtx.Sender(); senderPid != nil { actorCtx.Send(senderPid, response) } else { logger.Warnf("Cannot send PullTaskFromMasterResponse due to sender is unknown in handlePullTasks of taskActor, request=%+v", req) } }