pkg/protocol/codec/codec.go (104 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package codec
import (
"bytes"
"sync"
"vimagination.zapto.org/byteio"
"seata.apache.org/seata-go/pkg/protocol/message"
"seata.apache.org/seata-go/pkg/util/log"
)
type CodecType byte
const (
CodecTypeSeata = CodecType(0x1)
CodecTypeProtobuf = CodecType(0x2)
CodecTypeKRYO = CodecType(0x4)
CodecTypeFST = CodecType(0x8)
)
type Codec interface {
Encode(in interface{}) []byte
Decode(in []byte) interface{}
GetMessageType() message.MessageType
}
var (
codecManager *CodecManager
onceCodecManager = &sync.Once{}
)
func GetCodecManager() *CodecManager {
if codecManager == nil {
onceCodecManager.Do(func() {
codecManager = &CodecManager{
codecMap: make(map[CodecType]map[message.MessageType]Codec, 0),
}
})
}
return codecManager
}
type CodecManager struct {
mutex sync.Mutex
codecMap map[CodecType]map[message.MessageType]Codec
}
func (c *CodecManager) RegisterCodec(codecType CodecType, codec Codec) {
c.mutex.Lock()
defer c.mutex.Unlock()
codecTypeMap := c.codecMap[codecType]
if codecTypeMap == nil {
codecTypeMap = make(map[message.MessageType]Codec, 0)
c.codecMap[codecType] = codecTypeMap
}
codecTypeMap[codec.GetMessageType()] = codec
}
func (c *CodecManager) GetCodec(codecType CodecType, msgType message.MessageType) Codec {
if m := c.codecMap[codecType]; m != nil {
return m[msgType]
}
return nil
}
func (c *CodecManager) Decode(codecType CodecType, in []byte) interface{} {
r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
typeCode, _, _ := r.ReadInt16()
codec := c.GetCodec(codecType, message.MessageType(typeCode))
if codec == nil {
log.Errorf("This message type [%v] has no codec to decode", typeCode)
return nil
}
return codec.Decode(in[2:])
}
func (c *CodecManager) Encode(codecType CodecType, in interface{}) []byte {
result := make([]byte, 0)
msg := in.(message.MessageTypeAware)
typeCode := msg.GetTypeCode()
codec := c.GetCodec(codecType, typeCode)
if codec == nil {
log.Errorf("This message type [%v] has no codec to encode", typeCode)
return nil
}
body := codec.Encode(in)
typeC := uint16(typeCode)
result = append(result, []byte{byte(typeC >> 8), byte(typeC)}...)
result = append(result, body...)
return result
}
func Init() {
// Global
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalReportResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalBeginRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalBeginResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalCommitRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalCommitResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalLockQueryRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalLockQueryResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalRollbackRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalRollbackResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalStatusRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalStatusResponseCodec{})
// Branch
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchCommitRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchCommitResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchRegisterRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchRegisterResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchReportRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchRollbackRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchRollbackResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchReportResponseCodec{})
// RM
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterRMRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterRMResponseCodec{})
// TM
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterTMRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterTMResponseCodec{})
}