in internal/actor/task_actor_processor.go [46:127]
func (p *taskProcessor) SendUserMessage(pid *actor.PID, message interface{}) {
if actorcomm.IsSchedulerxServer(pid) {
var (
akkaMsg *akka.AkkaProtocolMessage
err error
)
wrappedMsg, ok := message.(*actorcomm.SchedulerWrappedMsg)
if !ok {
logger.Errorf("Get unknown message, msg=%+v", wrappedMsg)
return
}
conn, err := p.connpool.Get(wrappedMsg.Ctx)
if err != nil {
logger.Errorf("Get conn from pool failed, err=%s", err.Error())
return
}
switch msg := wrappedMsg.Msg.(type) {
case *schedulerx.WorkerReportJobInstanceProgressRequest:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
fmt.Sprintf("akka.tcp://server@%s/", conn.RemoteAddr().String()),
"",
"com.alibaba.schedulerx.protocol.Worker$WorkerReportJobInstanceProgressRequest",
codec.WithMessageContainerSerializer(),
codec.WithSelectionEnvelopePattern([]*akka.Selection{
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("user"),
},
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("map_master_router"),
},
}))
case *schedulerx.WorkerBatchUpdateTaskStatusRequest:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
fmt.Sprintf("akka.tcp://server@%s/user/map_master_router", conn.RemoteAddr().String()),
"",
"com.alibaba.schedulerx.protocol.Worker$WorkerBatchUpdateTaskStatusRequest",
codec.WithMessageContainerSerializer(),
codec.WithSelectionEnvelopePattern([]*akka.Selection{
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("user"),
},
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("map_master_router"),
},
}))
case *schedulerx.WorkerQueryJobInstanceStatusRequest:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
fmt.Sprintf("akka.tcp://server@%s/user/map_master_router", conn.RemoteAddr().String()),
"",
"com.alibaba.schedulerx.protocol.Worker$WorkerQueryJobInstanceStatusRequest",
codec.WithMessageContainerSerializer(),
codec.WithSelectionEnvelopePattern([]*akka.Selection{
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("user"),
},
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("map_master_router"),
},
}))
default:
logger.Errorf("Unknown akka message type=%+v", msg)
return
}
if err != nil {
logger.Errorf("Encode akka message failed, err=%s", err.Error())
return
}
if err := trans.WriteAkkaMsg(akkaMsg, conn); err != nil {
logger.Errorf("Write akka message failed, err=%s", err.Error())
return
}
}
}