func()

in internal/actor/task_actor_processor.go [46:127]


func (p *taskProcessor) SendUserMessage(pid *actor.PID, message interface{}) {
	if actorcomm.IsSchedulerxServer(pid) {
		var (
			akkaMsg *akka.AkkaProtocolMessage
			err     error
		)
		wrappedMsg, ok := message.(*actorcomm.SchedulerWrappedMsg)
		if !ok {
			logger.Errorf("Get unknown message, msg=%+v", wrappedMsg)
			return
		}
		conn, err := p.connpool.Get(wrappedMsg.Ctx)
		if err != nil {
			logger.Errorf("Get conn from pool failed, err=%s", err.Error())
			return
		}
		switch msg := wrappedMsg.Msg.(type) {
		case *schedulerx.WorkerReportJobInstanceProgressRequest:
			akkaMsg, err = codec.EncodeAkkaMessage(
				msg,
				fmt.Sprintf("akka.tcp://server@%s/", conn.RemoteAddr().String()),
				"",
				"com.alibaba.schedulerx.protocol.Worker$WorkerReportJobInstanceProgressRequest",
				codec.WithMessageContainerSerializer(),
				codec.WithSelectionEnvelopePattern([]*akka.Selection{
					{
						Type:    akka.PatternType_CHILD_NAME.Enum(),
						Matcher: proto.String("user"),
					},
					{
						Type:    akka.PatternType_CHILD_NAME.Enum(),
						Matcher: proto.String("map_master_router"),
					},
				}))
		case *schedulerx.WorkerBatchUpdateTaskStatusRequest:
			akkaMsg, err = codec.EncodeAkkaMessage(
				msg,
				fmt.Sprintf("akka.tcp://server@%s/user/map_master_router", conn.RemoteAddr().String()),
				"",
				"com.alibaba.schedulerx.protocol.Worker$WorkerBatchUpdateTaskStatusRequest",
				codec.WithMessageContainerSerializer(),
				codec.WithSelectionEnvelopePattern([]*akka.Selection{
					{
						Type:    akka.PatternType_CHILD_NAME.Enum(),
						Matcher: proto.String("user"),
					},
					{
						Type:    akka.PatternType_CHILD_NAME.Enum(),
						Matcher: proto.String("map_master_router"),
					},
				}))
		case *schedulerx.WorkerQueryJobInstanceStatusRequest:
			akkaMsg, err = codec.EncodeAkkaMessage(
				msg,
				fmt.Sprintf("akka.tcp://server@%s/user/map_master_router", conn.RemoteAddr().String()),
				"",
				"com.alibaba.schedulerx.protocol.Worker$WorkerQueryJobInstanceStatusRequest",
				codec.WithMessageContainerSerializer(),
				codec.WithSelectionEnvelopePattern([]*akka.Selection{
					{
						Type:    akka.PatternType_CHILD_NAME.Enum(),
						Matcher: proto.String("user"),
					},
					{
						Type:    akka.PatternType_CHILD_NAME.Enum(),
						Matcher: proto.String("map_master_router"),
					},
				}))
		default:
			logger.Errorf("Unknown akka message type=%+v", msg)
			return
		}
		if err != nil {
			logger.Errorf("Encode akka message failed, err=%s", err.Error())
			return
		}
		if err := trans.WriteAkkaMsg(akkaMsg, conn); err != nil {
			logger.Errorf("Write akka message failed, err=%s", err.Error())
			return
		}
	}
}