func InitActors()

in internal/actor/init.go [30:134]


func InitActors(actorSystem *actor.ActorSystem) error {
	// init containerActor
	containerRouterPid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor {
		return newContainerActor()
	}), actorcomm.ContainerRouterPidId)
	if err != nil {
		return err
	}

	// init jobInstanceActor
	jobInstancePid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor {
		return newJobInstanceActor(actorSystem)
	}), actorcomm.JobInstancePidId)
	if err != nil {
		return err
	}

	// init atLeastOnceDeliveryActor
	atLeastOnceDeliveryPid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor {
		return newAtLeastOnceDeliveryRoutingActor()
	}), actorcomm.AtLeastOnceDeliveryPidId)
	if err != nil {
		return err
	}

	// init taskActor
	taskActorPid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor {
		return newTaskActor(actorSystem)
	}), actorcomm.MapMasterPidId)
	if err != nil {
		return err
	}

	// init heartbeatActor
	heartbeatActorPid, err := actorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor {
		return newHeartbeatActor(actorSystem)
	}), actorcomm.HeartbeatPidId)
	if err != nil {
		return err
	}

	go func() {
		for {
			select {
			case sxMsg := <-actorcomm.SxMsgReceiver():
				actorSystem.Root.Send(jobInstancePid, sxMsg)
			case taskMasterMsg := <-actorcomm.TaskMasterMsgReceiver():
				actorSystem.Root.Send(taskActorPid, taskMasterMsg)
			case containerRouterMsg := <-actorcomm.ContainerRouterMsgReceiver():
				actorSystem.Root.Send(containerRouterPid, containerRouterMsg)
			case atLeastOnceDeliveryMsg := <-actorcomm.AtLeastOnceDeliveryMsgReceiver():
				actorSystem.Root.Send(atLeastOnceDeliveryPid, atLeastOnceDeliveryMsg)
			case heartbeatMsg := <-actorcomm.HeartbeatMsgReceiver():
				actorSystem.Root.Send(heartbeatActorPid, heartbeatMsg)
			}
		}
	}()

	var (
		host = "0.0.0.0"
		port = 0 // random port
	)
	if grpcPort := config.GetWorkerConfig().GrpcPort(); grpcPort != 0 {
		port = int(grpcPort)
	}

	if config.GetWorkerConfig().Iface() != "" {
		localHost, err := utils.GetIpv4AddrByIface(config.GetWorkerConfig().Iface())
		if err != nil {
			panic(err)
		}
		host = localHost
	} else {
		localHost, err := utils.GetIpv4AddrHost()
		if err != nil {
			panic(err)
		}
		host = localHost
	}

	// The maximum limit for a subtask is 64kb, and a maximum of 1000 batches can be sent together, which is 64MB,
	// plus about 200MB for serialization and request headers.
	remoteConfig := remote.Configure(host, port,
		remote.WithDialOptions(
			grpc.WithTransportCredentials(insecure.NewCredentials()),
			grpc.WithDefaultCallOptions(
				grpc.MaxCallRecvMsgSize(200*1024*1024),
				grpc.MaxCallSendMsgSize(200*1024*1024),
			)),
		remote.WithServerOptions(
			grpc.MaxRecvMsgSize(200*1024*1024),
			grpc.MaxSendMsgSize(200*1024*1024),
		))

	remoting := remote.NewRemote(actorSystem, remoteConfig)
	// Warning: must start remoting last, because it would register a default address resolver,
	// will it overwrite what we have registered ourselves.

	remoting.Register(actorcomm.ContainerRouterPidId, actor.PropsFromProducer(func() actor.Actor {
		return newContainerActor()
	}))
	remoting.Start()

	return nil
}