internal/actor/common/utils.go (57 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package actorcomm
import (
"context"
"fmt"
"net"
"strings"
"github.com/asynkron/protoactor-go/actor"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/pool"
)
const (
workerAkkaPathPrefix = "akka.tcp://"
workerAkkaContainerRouting = "container_routing"
workerAkkaContainerRoutingPath = "/user/" + workerAkkaContainerRouting
schedulerxServerPidId = "schedulerx"
ContainerRouterPidId = "user_container_routing"
JobInstancePidId = "job_instance_routing"
MapMasterPidId = "map_master_router"
AtLeastOnceDeliveryPidId = "at_least_once_delivery_routing"
HeartbeatPidId = "heartbeat_routing"
)
// GetContainerRouterPid get remote PID of container router
func GetContainerRouterPid(workerIdAddr string) *actor.PID {
return actor.NewPID(workerIdAddr, ContainerRouterPidId)
}
// GetMapMasterPid get remote PID of mapMaster router
func GetMapMasterPid(workerIdAddr string) *actor.PID {
return actor.NewPID(workerIdAddr, MapMasterPidId)
}
// GetHeartbeatActorPid get remote PID of heartbeat actor
func GetHeartbeatActorPid(workerIdAddr string) *actor.PID {
return actor.NewPID(workerIdAddr, HeartbeatPidId)
}
func IsSchedulerxServer(pid *actor.PID) bool {
return pid.GetId() == schedulerxServerPidId
}
func SchedulerxServerPid(ctx context.Context) *actor.PID {
conn, err := pool.GetConnPool().Get(ctx)
if err != nil {
return &actor.PID{}
}
return actor.NewPID(conn.RemoteAddr().String(), schedulerxServerPidId)
}
// GetRealWorkerAddr get the real workerAddr, which is the address of the remote worker's ActorSystem
// The workerAddr issued by the server is the address reported by the heartbeat.
// It is the connection address obtained from the connection pool, not the ActorSystem address, so it needs to be converted.
func GetRealWorkerAddr(workerIdAddr string) string {
parts := strings.Split(workerIdAddr, "@")
workerAddr := parts[1]
addrParts := strings.Split(workerAddr, ":")
var (
host, port string
)
// Note: akka_port is used for akka communication between master and server, rpc_port is used for actorSystem communication between master and worker.
// Old format (compatible): host:akka_port
// New format: host:akka_port:rpc_port
if !(len(addrParts) == 2 || len(addrParts) == 3) {
panic(fmt.Sprintf("invalid worker addr: %s", workerAddr))
}
host = addrParts[0]
if len(addrParts) == 2 {
port = addrParts[1]
} else if len(addrParts) == 3 {
port = addrParts[2]
}
return net.JoinHostPort(host, port)
}