internal/actor/init.go (95 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package actor import ( "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/remote" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/alibaba/schedulerx-worker-go/config" "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/utils" ) func InitActors(actorSystem *actor.ActorSystem) error { // init containerActor containerRouterPid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor { return newContainerActor() }), actorcomm.ContainerRouterPidId) if err != nil { return err } // init jobInstanceActor jobInstancePid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor { return newJobInstanceActor(actorSystem) }), actorcomm.JobInstancePidId) if err != nil { return err } // init atLeastOnceDeliveryActor atLeastOnceDeliveryPid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor { return newAtLeastOnceDeliveryRoutingActor() }), actorcomm.AtLeastOnceDeliveryPidId) if err != nil { return err } // init taskActor taskActorPid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor { return newTaskActor(actorSystem) }), actorcomm.MapMasterPidId) if err != nil { return err } // init heartbeatActor heartbeatActorPid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor { return newHeartbeatActor(actorSystem) }), actorcomm.HeartbeatPidId) if err != nil { return err } go func() { for { select { case sxMsg := <-actorcomm.SxMsgReceiver(): actorSystem.Root.Send(jobInstancePid, sxMsg) case taskMasterMsg := <-actorcomm.TaskMasterMsgReceiver(): actorSystem.Root.Send(taskActorPid, taskMasterMsg) case containerRouterMsg := <-actorcomm.ContainerRouterMsgReceiver(): actorSystem.Root.Send(containerRouterPid, containerRouterMsg) case atLeastOnceDeliveryMsg := <-actorcomm.AtLeastOnceDeliveryMsgReceiver(): actorSystem.Root.Send(atLeastOnceDeliveryPid, atLeastOnceDeliveryMsg) case heartbeatMsg := <-actorcomm.HeartbeatMsgReceiver(): actorSystem.Root.Send(heartbeatActorPid, heartbeatMsg) } } }() var ( host = "0.0.0.0" port = 0 // random port ) if grpcPort := config.GetWorkerConfig().GrpcPort(); grpcPort != 0 { port = int(grpcPort) } if config.GetWorkerConfig().Iface() != "" { localHost, err := utils.GetIpv4AddrByIface(config.GetWorkerConfig().Iface()) if err != nil { panic(err) } host = localHost } else { localHost, err := utils.GetIpv4AddrHost() if err != nil { panic(err) } host = localHost } // The maximum limit for a subtask is 64kb, and a maximum of 1000 batches can be sent together, which is 64MB, // plus about 200MB for serialization and request headers. remoteConfig := remote.Configure(host, port, remote.WithDialOptions( grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(200*1024*1024), grpc.MaxCallSendMsgSize(200*1024*1024), )), remote.WithServerOptions( grpc.MaxRecvMsgSize(200*1024*1024), grpc.MaxSendMsgSize(200*1024*1024), )) remoting := remote.NewRemote(actorSystem, remoteConfig) // Warning: must start remoting last, because it would register a default address resolver, // will it overwrite what we have registered ourselves. remoting.Register(actorcomm.ContainerRouterPidId, actor.PropsFromProducer(func() actor.Actor { return newContainerActor() })) remoting.Start() return nil }