pkg/eas/types/codec.go (243 lines of code) (raw):
package types
import (
"bytes"
"encoding/base64"
"encoding/binary"
"encoding/json"
"github.com/alibaba/pairec/v2/pkg/eas/types/queue_service_protos"
"google.golang.org/protobuf/proto"
"io"
)
type (
// Protobuf data frame codec implement.
pbDataFrameCodec struct{}
// Protobuf attributes codec implement.
pbAttributesCodec struct{}
// JSON data frame codec implement.
jsonDataFrameCodec struct{}
// JSON attributes codec implement.
jsonAttributesCodec struct{}
)
const (
ContentTypeProtobuf = "application/vnd.google.protobuf"
ContentTypeFlatbuffer = "application/x-flatbuffers"
ContentTypeJSON = "application/json"
)
func DataFrameCodecFor(contentType string) DataFrameCodec {
switch contentType {
case ContentTypeProtobuf:
return &pbDataFrameCodec{}
case ContentTypeJSON:
return &jsonDataFrameCodec{}
default:
return nil
}
}
func AttributesCodecFor(contentType string) AttributesCodec {
switch contentType {
case ContentTypeProtobuf:
return &pbAttributesCodec{}
case ContentTypeJSON:
return &jsonAttributesCodec{}
default:
return nil
}
}
func (p *pbDataFrameCodec) EncodeList(list []DataFrame, w io.Writer) error {
dfProto := queue_service_protos.DataFrameListProto{}
for _, df := range list {
dfProto.Index = append(dfProto.Index, &queue_service_protos.DataFrameProto{
Index: df.Index.Uint64(),
Data: df.Data,
Tags: df.Tags,
})
}
data, err := proto.Marshal(&dfProto)
if err != nil {
return err
}
_, _ = w.Write(data)
return nil
}
func (p *pbDataFrameCodec) DecodeList(bytes []byte) ([]DataFrame, error) {
dfProto := queue_service_protos.DataFrameListProto{}
if err := proto.Unmarshal(bytes, &dfProto); err != nil {
return nil, err
}
ret := make([]DataFrame, 0, len(dfProto.Index))
for _, idx := range dfProto.Index {
ret = append(ret, DataFrame{
Data: idx.Data,
Index: FromUint64(idx.Index),
Tags: idx.Tags,
})
}
return ret, nil
}
func (p *pbDataFrameCodec) MediaType() string {
return ContentTypeProtobuf
}
func (p *pbDataFrameCodec) Encode(frame DataFrame, w io.Writer) error {
dfProto := queue_service_protos.DataFrameProto{Data: frame.Data, Tags: frame.Tags, Index: frame.Index.Uint64(), Message: frame.Message}
data, err := proto.Marshal(&dfProto)
if err != nil {
return err
}
_, _ = w.Write(data)
return nil
}
func (p *pbDataFrameCodec) Decode(bytes []byte, frame *DataFrame) error {
dfProto := queue_service_protos.DataFrameProto{}
if err := proto.Unmarshal(bytes, &dfProto); err != nil {
return err
}
frame.Tags = dfProto.Tags
frame.Index = FromUint64(dfProto.Index)
frame.Data = dfProto.Data
return nil
}
func (p *pbAttributesCodec) MediaType() string {
return ContentTypeProtobuf
}
func (p *pbAttributesCodec) Encode(a Attributes, w io.Writer) error {
aProto := queue_service_protos.AttributesProto{Attributes: a}
data, err := proto.Marshal(&aProto)
if err != nil {
return err
}
_, _ = w.Write(data)
return nil
}
func (p *pbAttributesCodec) Decode(bytes []byte, a *Attributes) error {
aProto := queue_service_protos.AttributesProto{}
if err := proto.Unmarshal(bytes, &aProto); err != nil {
return err
}
attr := Attributes(aProto.Attributes)
*a = attr
return nil
}
func (j *jsonAttributesCodec) MediaType() string {
return ContentTypeJSON
}
func (j *jsonAttributesCodec) Encode(attr Attributes, w io.Writer) error {
return json.NewEncoder(w).Encode(attr)
}
func (j *jsonAttributesCodec) Decode(data []byte, attributes *Attributes) error {
return json.NewDecoder(bytes.NewReader(data)).Decode(attributes)
}
func (j *jsonDataFrameCodec) MediaType() string {
return ContentTypeJSON
}
type dataFrameJSON struct {
Index uint64 `json:"index"`
Message string `json:"message,omitempty"`
Tags map[string]string `json:"tags"`
Data string `json:"data"`
}
func (j *jsonDataFrameCodec) Encode(frame DataFrame, w io.Writer) error {
jd := dataFrameJSON{Data: base64.StdEncoding.EncodeToString(frame.Data), Tags: frame.Tags, Message: frame.Message, Index: frame.Index.Uint64()}
return json.NewEncoder(w).Encode(jd)
}
func (j *jsonDataFrameCodec) EncodeList(list []DataFrame, w io.Writer) error {
jds := make([]dataFrameJSON, 0, len(list))
for _, frame := range list {
jd := dataFrameJSON{Data: base64.StdEncoding.EncodeToString(frame.Data), Tags: frame.Tags, Message: frame.Message, Index: frame.Index.Uint64()}
jds = append(jds, jd)
}
return json.NewEncoder(w).Encode(jds)
}
func (j *jsonDataFrameCodec) Decode(i []byte, frame *DataFrame) error {
jd := dataFrameJSON{}
err := json.NewDecoder(bytes.NewReader(i)).Decode(&jd)
if err != nil {
return err
}
data, err := base64.StdEncoding.DecodeString(jd.Data)
if err != nil {
return err
}
*frame = DataFrame{
Index: FromUint64(jd.Index),
Message: jd.Message,
Tags: jd.Tags,
Data: data,
}
return nil
}
func (j *jsonDataFrameCodec) DecodeList(i []byte) ([]DataFrame, error) {
var jds []dataFrameJSON
err := json.NewDecoder(bytes.NewReader(i)).Decode(&jds)
if err != nil {
return nil, err
}
frames := make([]DataFrame, 0, len(jds))
for _, jd := range jds {
data, err := base64.StdEncoding.DecodeString(jd.Data)
if err != nil {
return nil, err
}
frames = append(frames, DataFrame{
Index: FromUint64(jd.Index),
Message: jd.Message,
Tags: jd.Tags,
Data: data,
})
}
return frames, nil
}
type lengthDelimitedFrameWriter struct {
w io.Writer
h [4]byte
}
func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer {
return &lengthDelimitedFrameWriter{w: w}
}
// Write writes a single frame to the nested writer, prepending it with the length in
// in bytes of data (as a 4 byte, bigendian uint32).
func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) {
binary.BigEndian.PutUint32(w.h[:], uint32(len(data)))
n, err := w.w.Write(w.h[:])
if err != nil {
return 0, err
}
if n != len(w.h) {
return 0, io.ErrShortWrite
}
return w.w.Write(data)
}
type lengthDelimitedFrameReader struct {
r io.ReadCloser
remaining int
}
// NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed
// frames off of a stream.
//
// The protocol is:
//
// stream: message ...
// message: prefix body
// prefix: 4 byte uint32 in BigEndian order, denotes length of body
// body: bytes (0..prefix)
//
// If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead
// will be returned along with the number of bytes read.
func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser {
return &lengthDelimitedFrameReader{r: r}
}
// Read attempts to read an entire frame into data. If that is not possible, io.ErrShortBuffer
// is returned and subsequent calls will attempt to read the last frame. A frame is complete when
// err is nil.
func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) {
if r.remaining <= 0 {
header := [4]byte{}
n, err := io.ReadAtLeast(r.r, header[:4], 4)
if err != nil {
return 0, err
}
if n != 4 {
return 0, io.ErrUnexpectedEOF
}
frameLength := int(binary.BigEndian.Uint32(header[:]))
r.remaining = frameLength
}
expect := r.remaining
max := expect
if max > len(data) {
max = len(data)
}
n, err := io.ReadAtLeast(r.r, data[:max], max)
r.remaining -= n
if err == io.ErrShortBuffer || r.remaining > 0 {
return n, io.ErrShortBuffer
}
if err != nil {
return n, err
}
if n != expect {
return n, io.ErrUnexpectedEOF
}
return n, nil
}
func (r *lengthDelimitedFrameReader) Close() error {
return r.r.Close()
}