internal/remoting/codec/akka_codec.go (197 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package codec import ( "fmt" "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/internal/proto/akka" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/internal/utils" ) type Options struct { isMessageContainerSerializer bool selectionEnvelopePattern []*akka.Selection } type Option func(*Options) func WithMessageContainerSerializer() Option { return func(o *Options) { o.isMessageContainerSerializer = true } } func WithSelectionEnvelopePattern(pattern []*akka.Selection) Option { return func(o *Options) { o.selectionEnvelopePattern = pattern } } func EncodeAkkaMessage(rawMsg proto.Message, recipientPath, senderPath, manifest string, opts ...Option) (*akka.AkkaProtocolMessage, error) { var ( payloadData []byte err error ) options := new(Options) for _, opt := range opts { opt(options) } marshaledMsg, err := proto.Marshal(rawMsg) if err != nil { return nil, err } if options.isMessageContainerSerializer { selectionEnvelope := &akka.SelectionEnvelope{ EnclosedMessage: marshaledMsg, // use ProtobufSerializer // ref: https://github.com/akka/akka/blob/main/akka-remote/src/main/resources/reference.conf#L110 SerializerId: proto.Int32(2), Pattern: options.selectionEnvelopePattern, MessageManifest: []byte(manifest), WildcardFanOut: proto.Bool(false), } selectionEnvelopeData, err := proto.Marshal(selectionEnvelope) if err != nil { return nil, fmt.Errorf("Marshal SelectionEnvelope failed, err=%s ", err.Error()) } serializedMessage := &akka.SerializedMessage{ Message: selectionEnvelopeData, // use MessageContainerSerializer // ref: https://github.com/akka/akka/blob/main/akka-remote/src/main/resources/reference.conf#L112 SerializerId: proto.Int32(6), } envelopeContainer := &akka.AckAndEnvelopeContainer{ Envelope: &akka.RemoteEnvelope{ Recipient: &akka.ActorRefData{ Path: proto.String(recipientPath), }, Message: serializedMessage, Sender: &akka.ActorRefData{ Path: proto.String(senderPath), }, }, } payloadData, err = proto.Marshal(envelopeContainer) if err != nil { return nil, fmt.Errorf("Marshal AckAndEnvelopeContainer failed, err=%s ", err.Error()) } } else { envelopeContainer := &akka.AckAndEnvelopeContainer{ Envelope: &akka.RemoteEnvelope{ Recipient: &akka.ActorRefData{ Path: proto.String(recipientPath), }, Message: &akka.SerializedMessage{ Message: marshaledMsg, // use ProtobufSerializer // ref: https://github.com/akka/akka/blob/main/akka-remote/src/main/resources/reference.conf#L110 SerializerId: proto.Int32(2), MessageManifest: []byte(manifest), }, Sender: &akka.ActorRefData{ Path: proto.String(senderPath), }, }, } payloadData, err = proto.Marshal(envelopeContainer) if err != nil { return nil, fmt.Errorf("Marshal AckAndEnvelopeContainer failed, err=%s ", err.Error()) } } return &akka.AkkaProtocolMessage{ Payload: payloadData, }, nil } func DecodeAkkaMessage(msg *akka.AkkaProtocolMessage) (interface{}, string, error) { envelopeContainer := new(akka.AckAndEnvelopeContainer) if err := proto.Unmarshal(msg.Payload, envelopeContainer); err != nil { return nil, "", err } envelope := envelopeContainer.Envelope if msg.Instruction != nil { switch msg.Instruction.GetCommandType() { case akka.CommandType_ASSOCIATE, akka.CommandType_DISASSOCIATE, akka.CommandType_HEARTBEAT, akka.CommandType_DISASSOCIATE_SHUTTING_DOWN, akka.CommandType_DISASSOCIATE_QUARANTINED: return msg.GetInstruction(), "", nil default: return nil, "", fmt.Errorf("Unknown msg instruction=%s, decode failed ", msg.Instruction.GetCommandType()) } } else if envelope != nil && envelope.Sender != nil && envelope.Message != nil { var ( msgType string err error msgRawData []byte ) switch *envelope.Message.SerializerId { case 2: // use ProtobufSerializer // ref: https://github.com/akka/akka/blob/main/akka-remote/src/main/resources/reference.conf#L110 msgType, err = utils.GetMsgType(string(envelope.Message.MessageManifest)) if err != nil { return fmt.Errorf("Get message type from manifest failed, err=%s ", err.Error()), "", nil } msgRawData = envelope.Message.Message case 6: // use MessageContainerSerializer // ref: https://github.com/akka/akka/blob/main/akka-remote/src/main/resources/reference.conf#L112 innerMsg := new(akka.SelectionEnvelope) if err := proto.Unmarshal(envelope.Message.Message, innerMsg); err != nil { return fmt.Errorf("Unmarshal envelope.Message.Message to SelectionEnvelope failed, err=%s ", err.Error()), "", nil } msgType, err = utils.GetMsgType(string(innerMsg.MessageManifest)) if err != nil { return fmt.Errorf("Get message type from manifest failed, err=%s ", err.Error()), "", nil } msgRawData = innerMsg.EnclosedMessage default: return fmt.Errorf("Unknown serializerId=%d in envelope.Message.Message ", *envelope.Message.SerializerId), "", nil } senderPath := envelope.Sender.GetPath() switch msgType { case "WorkerHeartBeatResponse": msg := new(schedulerx.WorkerHeartBeatResponse) if err := proto.Unmarshal(msgRawData, msg); err != nil { return nil, "", err } return msg, senderPath, nil case "ServerSubmitJobInstanceRequest": msg := new(schedulerx.ServerSubmitJobInstanceRequest) if err := proto.Unmarshal(msgRawData, msg); err != nil { return nil, "", err } return msg, senderPath, nil case "ServerKillJobInstanceRequest": msg := new(schedulerx.ServerKillJobInstanceRequest) if err := proto.Unmarshal(msgRawData, msg); err != nil { return nil, "", err } return msg, senderPath, nil case "ServerKillTaskRequest": msg := new(schedulerx.ServerKillTaskRequest) if err := proto.Unmarshal(msgRawData, msg); err != nil { return nil, "", err } return msg, senderPath, nil case "ServerCheckTaskMasterRequest": msg := new(schedulerx.ServerCheckTaskMasterRequest) if err := proto.Unmarshal(msgRawData, msg); err != nil { return nil, "", err } return msg, senderPath, nil case "MasterNotifyWorkerPullRequest": msg := new(schedulerx.MasterNotifyWorkerPullRequest) if err := proto.Unmarshal(msgRawData, msg); err != nil { return nil, "", err } return msg, senderPath, nil case "ServerThreadDumpRequest": msg := new(schedulerx.ServerThreadDumpRequest) if err := proto.Unmarshal(msgRawData, msg); err != nil { return nil, "", err } return msg, senderPath, nil case "ServerCallbackCalendarRequest": msg := new(schedulerx.ServerCallbackCalendarRequest) if err := proto.Unmarshal(msgRawData, msg); err != nil { return nil, "", err } return msg, senderPath, nil case "WorkerReportJobInstanceStatusResponse": msg := new(schedulerx.WorkerReportJobInstanceStatusResponse) if err := proto.Unmarshal(msgRawData, msg); err != nil { return nil, "", err } return msg, senderPath, nil default: return nil, "", fmt.Errorf("Unknown message type=%s, decode failed ", msgType) } } return nil, "", fmt.Errorf("Unknown message=%+v, decode failed ", msg) }