in internal/remoting/codec/akka_codec.go [128:239]
func DecodeAkkaMessage(msg *akka.AkkaProtocolMessage) (interface{}, string, error) {
envelopeContainer := new(akka.AckAndEnvelopeContainer)
if err := proto.Unmarshal(msg.Payload, envelopeContainer); err != nil {
return nil, "", err
}
envelope := envelopeContainer.Envelope
if msg.Instruction != nil {
switch msg.Instruction.GetCommandType() {
case akka.CommandType_ASSOCIATE,
akka.CommandType_DISASSOCIATE,
akka.CommandType_HEARTBEAT,
akka.CommandType_DISASSOCIATE_SHUTTING_DOWN,
akka.CommandType_DISASSOCIATE_QUARANTINED:
return msg.GetInstruction(), "", nil
default:
return nil, "", fmt.Errorf("Unknown msg instruction=%s, decode failed ", msg.Instruction.GetCommandType())
}
} else if envelope != nil && envelope.Sender != nil && envelope.Message != nil {
var (
msgType string
err error
msgRawData []byte
)
switch *envelope.Message.SerializerId {
case 2:
// use ProtobufSerializer
// ref: https://github.com/akka/akka/blob/main/akka-remote/src/main/resources/reference.conf#L110
msgType, err = utils.GetMsgType(string(envelope.Message.MessageManifest))
if err != nil {
return fmt.Errorf("Get message type from manifest failed, err=%s ", err.Error()), "", nil
}
msgRawData = envelope.Message.Message
case 6:
// use MessageContainerSerializer
// ref: https://github.com/akka/akka/blob/main/akka-remote/src/main/resources/reference.conf#L112
innerMsg := new(akka.SelectionEnvelope)
if err := proto.Unmarshal(envelope.Message.Message, innerMsg); err != nil {
return fmt.Errorf("Unmarshal envelope.Message.Message to SelectionEnvelope failed, err=%s ", err.Error()), "", nil
}
msgType, err = utils.GetMsgType(string(innerMsg.MessageManifest))
if err != nil {
return fmt.Errorf("Get message type from manifest failed, err=%s ", err.Error()), "", nil
}
msgRawData = innerMsg.EnclosedMessage
default:
return fmt.Errorf("Unknown serializerId=%d in envelope.Message.Message ", *envelope.Message.SerializerId), "", nil
}
senderPath := envelope.Sender.GetPath()
switch msgType {
case "WorkerHeartBeatResponse":
msg := new(schedulerx.WorkerHeartBeatResponse)
if err := proto.Unmarshal(msgRawData, msg); err != nil {
return nil, "", err
}
return msg, senderPath, nil
case "ServerSubmitJobInstanceRequest":
msg := new(schedulerx.ServerSubmitJobInstanceRequest)
if err := proto.Unmarshal(msgRawData, msg); err != nil {
return nil, "", err
}
return msg, senderPath, nil
case "ServerKillJobInstanceRequest":
msg := new(schedulerx.ServerKillJobInstanceRequest)
if err := proto.Unmarshal(msgRawData, msg); err != nil {
return nil, "", err
}
return msg, senderPath, nil
case "ServerKillTaskRequest":
msg := new(schedulerx.ServerKillTaskRequest)
if err := proto.Unmarshal(msgRawData, msg); err != nil {
return nil, "", err
}
return msg, senderPath, nil
case "ServerCheckTaskMasterRequest":
msg := new(schedulerx.ServerCheckTaskMasterRequest)
if err := proto.Unmarshal(msgRawData, msg); err != nil {
return nil, "", err
}
return msg, senderPath, nil
case "MasterNotifyWorkerPullRequest":
msg := new(schedulerx.MasterNotifyWorkerPullRequest)
if err := proto.Unmarshal(msgRawData, msg); err != nil {
return nil, "", err
}
return msg, senderPath, nil
case "ServerThreadDumpRequest":
msg := new(schedulerx.ServerThreadDumpRequest)
if err := proto.Unmarshal(msgRawData, msg); err != nil {
return nil, "", err
}
return msg, senderPath, nil
case "ServerCallbackCalendarRequest":
msg := new(schedulerx.ServerCallbackCalendarRequest)
if err := proto.Unmarshal(msgRawData, msg); err != nil {
return nil, "", err
}
return msg, senderPath, nil
case "WorkerReportJobInstanceStatusResponse":
msg := new(schedulerx.WorkerReportJobInstanceStatusResponse)
if err := proto.Unmarshal(msgRawData, msg); err != nil {
return nil, "", err
}
return msg, senderPath, nil
default:
return nil, "", fmt.Errorf("Unknown message type=%s, decode failed ", msgType)
}
}
return nil, "", fmt.Errorf("Unknown message=%+v, decode failed ", msg)
}