func DecodeAkkaMessage()

in internal/remoting/codec/akka_codec.go [128:239]


func DecodeAkkaMessage(msg *akka.AkkaProtocolMessage) (interface{}, string, error) {
	envelopeContainer := new(akka.AckAndEnvelopeContainer)
	if err := proto.Unmarshal(msg.Payload, envelopeContainer); err != nil {
		return nil, "", err
	}
	envelope := envelopeContainer.Envelope

	if msg.Instruction != nil {
		switch msg.Instruction.GetCommandType() {
		case akka.CommandType_ASSOCIATE,
			akka.CommandType_DISASSOCIATE,
			akka.CommandType_HEARTBEAT,
			akka.CommandType_DISASSOCIATE_SHUTTING_DOWN,
			akka.CommandType_DISASSOCIATE_QUARANTINED:
			return msg.GetInstruction(), "", nil
		default:
			return nil, "", fmt.Errorf("Unknown msg instruction=%s, decode failed ", msg.Instruction.GetCommandType())
		}
	} else if envelope != nil && envelope.Sender != nil && envelope.Message != nil {
		var (
			msgType    string
			err        error
			msgRawData []byte
		)
		switch *envelope.Message.SerializerId {
		case 2:
			// use ProtobufSerializer
			// ref: https://github.com/akka/akka/blob/main/akka-remote/src/main/resources/reference.conf#L110
			msgType, err = utils.GetMsgType(string(envelope.Message.MessageManifest))
			if err != nil {
				return fmt.Errorf("Get message type from manifest failed, err=%s ", err.Error()), "", nil
			}
			msgRawData = envelope.Message.Message
		case 6:
			// use MessageContainerSerializer
			// ref: https://github.com/akka/akka/blob/main/akka-remote/src/main/resources/reference.conf#L112
			innerMsg := new(akka.SelectionEnvelope)
			if err := proto.Unmarshal(envelope.Message.Message, innerMsg); err != nil {
				return fmt.Errorf("Unmarshal envelope.Message.Message to SelectionEnvelope failed, err=%s ", err.Error()), "", nil
			}
			msgType, err = utils.GetMsgType(string(innerMsg.MessageManifest))
			if err != nil {
				return fmt.Errorf("Get message type from manifest failed, err=%s ", err.Error()), "", nil
			}
			msgRawData = innerMsg.EnclosedMessage
		default:
			return fmt.Errorf("Unknown serializerId=%d in envelope.Message.Message ", *envelope.Message.SerializerId), "", nil
		}

		senderPath := envelope.Sender.GetPath()
		switch msgType {
		case "WorkerHeartBeatResponse":
			msg := new(schedulerx.WorkerHeartBeatResponse)
			if err := proto.Unmarshal(msgRawData, msg); err != nil {
				return nil, "", err
			}
			return msg, senderPath, nil
		case "ServerSubmitJobInstanceRequest":
			msg := new(schedulerx.ServerSubmitJobInstanceRequest)
			if err := proto.Unmarshal(msgRawData, msg); err != nil {
				return nil, "", err
			}
			return msg, senderPath, nil
		case "ServerKillJobInstanceRequest":
			msg := new(schedulerx.ServerKillJobInstanceRequest)
			if err := proto.Unmarshal(msgRawData, msg); err != nil {
				return nil, "", err
			}
			return msg, senderPath, nil
		case "ServerKillTaskRequest":
			msg := new(schedulerx.ServerKillTaskRequest)
			if err := proto.Unmarshal(msgRawData, msg); err != nil {
				return nil, "", err
			}
			return msg, senderPath, nil
		case "ServerCheckTaskMasterRequest":
			msg := new(schedulerx.ServerCheckTaskMasterRequest)
			if err := proto.Unmarshal(msgRawData, msg); err != nil {
				return nil, "", err
			}
			return msg, senderPath, nil
		case "MasterNotifyWorkerPullRequest":
			msg := new(schedulerx.MasterNotifyWorkerPullRequest)
			if err := proto.Unmarshal(msgRawData, msg); err != nil {
				return nil, "", err
			}
			return msg, senderPath, nil
		case "ServerThreadDumpRequest":
			msg := new(schedulerx.ServerThreadDumpRequest)
			if err := proto.Unmarshal(msgRawData, msg); err != nil {
				return nil, "", err
			}
			return msg, senderPath, nil
		case "ServerCallbackCalendarRequest":
			msg := new(schedulerx.ServerCallbackCalendarRequest)
			if err := proto.Unmarshal(msgRawData, msg); err != nil {
				return nil, "", err
			}
			return msg, senderPath, nil
		case "WorkerReportJobInstanceStatusResponse":
			msg := new(schedulerx.WorkerReportJobInstanceStatusResponse)
			if err := proto.Unmarshal(msgRawData, msg); err != nil {
				return nil, "", err
			}
			return msg, senderPath, nil
		default:
			return nil, "", fmt.Errorf("Unknown message type=%s, decode failed ", msgType)
		}
	}

	return nil, "", fmt.Errorf("Unknown message=%+v, decode failed ", msg)
}