internal/actor/heartbeat_actor.go (77 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package actor import ( "fmt" "github.com/asynkron/protoactor-go/actor" "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/masterpool" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/internal/remoting/pool" "github.com/alibaba/schedulerx-worker-go/logger" ) var _ actor.Actor = &heartbeatActor{} // heartbeatActor is the type heartbeatActor struct { connpool pool.ConnPool taskMasterPool *masterpool.TaskMasterPool } func newHeartbeatActor(actorSystem *actor.ActorSystem) *heartbeatActor { tActor := &heartbeatActor{ connpool: pool.GetConnPool(), taskMasterPool: masterpool.GetTaskMasterPool(), } resolver := func(pid *actor.PID) (actor.Process, bool) { if actorcomm.IsSchedulerxServer(pid) { return newHeartbeatProcessor(tActor.connpool), true } // If communicate with actors other than server, then use the default handler (return false) return nil, false } actorSystem.ProcessRegistry.RegisterAddressResolver(resolver) return tActor } func (a *heartbeatActor) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *schedulerx.MasterCheckWorkerAliveRequest: a.handleCheckWorkerAlive(ctx, msg) case *schedulerx.ContainerCheckZombieRequest: a.handleCheckZombie(ctx, msg) default: logger.Warnf("[heartbeatActor] receive unknown message, msg=%+v", ctx.Message()) } } func (a *heartbeatActor) handleCheckWorkerAlive(ctx actor.Context, req *schedulerx.MasterCheckWorkerAliveRequest) { resp := &schedulerx.MasterCheckWorkerAliveResponse{ Success: proto.Bool(true), } if req.GetDispatchMode() == string(common.TaskDispatchModePull) { resp = &schedulerx.MasterCheckWorkerAliveResponse{ Success: proto.Bool(false), Message: proto.String(fmt.Sprintf("%d is crashed in PullMananger", req.GetJobInstanceId())), } } else { // FIXME implement metric monitor resp = &schedulerx.MasterCheckWorkerAliveResponse{ Success: proto.Bool(true), } } if senderPid := ctx.Sender(); senderPid != nil { ctx.Send(senderPid, resp) } else { logger.Warnf("Cannot send MasterCheckWorkerAliveRequest due to sender is unknown in handleCheckWorkerAlive of heartbeatActor, request=%+v", req) } } func (a *heartbeatActor) handleCheckZombie(ctx actor.Context, req *schedulerx.ContainerCheckZombieRequest) { zombieJobInstanceIds := make([]int64, 0, len(req.GetJobInstanceId())) for _, jobInstanceId := range req.GetJobInstanceId() { if !a.taskMasterPool.Contains(jobInstanceId) { zombieJobInstanceIds = append(zombieJobInstanceIds, jobInstanceId) } } resp := &schedulerx.ContainerCheckZombieResponse{ ZombieJobInstanceId: zombieJobInstanceIds, } if senderPid := ctx.Sender(); senderPid != nil { ctx.Send(senderPid, resp) } else { logger.Warnf("Cannot send ContainerCheckZombieRequest due to sender is unknown in handleCheckZombie of heartbeatActor, request=%+v", req) } }