in internal/actor/job_instance_processor.go [47:176]
func (p *jobInstanceProcessor) SendUserMessage(pid *actor.PID, message interface{}) {
if actorcomm.IsSchedulerxServer(pid) {
var (
akkaMsg *akka.AkkaProtocolMessage
err error
)
wrappedMsg, ok := message.(*actorcomm.SchedulerWrappedMsg)
if !ok {
logger.Errorf("Get unknown message, msg=%+v", wrappedMsg)
return
}
conn, err := p.connpool.Get(wrappedMsg.Ctx)
if err != nil {
logger.Errorf("Get conn from pool failed, err=%s", err.Error())
return
}
switch msg := wrappedMsg.Msg.(type) {
case *schedulerx.ServerSubmitJobInstanceResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$ServerSubmitJobInstanceResponse")
case *schedulerx.ServerKillJobInstanceResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$ServerKillJobInstanceResponse")
case *schedulerx.MasterKillContainerResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$MasterKillContainerResponse")
case *schedulerx.ServerKillTaskResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$ServerKillTaskResponse")
case *schedulerx.WorkerReportJobInstanceStatusRequest:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
fmt.Sprintf("akka.tcp://server@%s/", conn.RemoteAddr().String()),
fmt.Sprintf("akka.tcp://%s@%s/user/at_least_once_delivery_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Worker$WorkerReportJobInstanceStatusRequest",
codec.WithMessageContainerSerializer(),
codec.WithSelectionEnvelopePattern([]*akka.Selection{
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("user"),
},
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("instance_status_router"),
},
}))
case *schedulerx.WorkerBatchReportTaskStatuesRequest:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
fmt.Sprintf("akka.tcp://server@%s/", conn.RemoteAddr().String()),
fmt.Sprintf("akka.tcp://%s@%s/user/at_least_once_delivery_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Worker$WorkerBatchReportTaskStatuesRequest",
codec.WithMessageContainerSerializer(),
codec.WithSelectionEnvelopePattern([]*akka.Selection{
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("user"),
},
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("instance_status_router"),
},
}))
case *schedulerx.WorkerReportJobInstanceProgressRequest:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
fmt.Sprintf("akka.tcp://server@%s/", conn.RemoteAddr().String()),
"",
"com.alibaba.schedulerx.protocol.Worker$WorkerReportJobInstanceProgressRequest",
codec.WithMessageContainerSerializer(),
codec.WithSelectionEnvelopePattern([]*akka.Selection{
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("user"),
},
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("map_master_router"),
},
}))
case *schedulerx.ServerRetryTasksResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$ServerRetryTasksResponse")
case *schedulerx.ServerCheckTaskMasterResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$ServerCheckTaskMasterResponse")
case *schedulerx.MasterNotifyWorkerPullResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$MasterNotifyWorkerPullResponse")
case *schedulerx.WorkerReportTaskListStatusResponse:
akkaMsg, err = codec.EncodeAkkaMessage(
msg,
wrappedMsg.SenderPath,
fmt.Sprintf("akka.tcp://%s@%s/user/job_instance_routing/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Server$WorkerReportTaskListStatusResponse")
default:
logger.Errorf("Unknown akka message type=%+v", msg)
return
}
if err != nil {
logger.Errorf("Encode akka message failed, err=%s", err.Error())
return
}
if err := trans.WriteAkkaMsg(akkaMsg, conn); err != nil {
logger.Errorf("Write akka message failed, err=%s", err.Error())
return
}
}
}