datahub/binaryrecord.go (531 lines of code) (raw):
package datahub
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math"
"reflect"
"github.com/shopspring/decimal"
)
const (
fieldCountSize int = 4
intByteSize int = 4
byteSizePerField int = 8
binaryRecordHeaderSize int = 16
)
type binaryRecordHeader struct {
encodeType int32
schemaVersion int32
totalSize int32
attrOffset int32
}
func newBinaryRecordHeader(schemaVersion, recordSize, nextOffset int) *binaryRecordHeader {
return &binaryRecordHeader{
encodeType: 1,
schemaVersion: int32(schemaVersion),
totalSize: int32(recordSize),
attrOffset: int32(nextOffset),
}
}
type binaryRecord struct {
parsedAttr bool
fieldStartOffset int
nextOffset int
fieldCount int
attrLen int
schemaVersion int
schema *RecordSchema
header *binaryRecordHeader
data []byte
attributes map[string]string
}
func newBinaryRecordForDeserialize(buf []byte, header *binaryRecordHeader, schema *RecordSchema) *binaryRecord {
bRecord := &binaryRecord{
data: buf,
header: header,
schema: schema,
schemaVersion: int(header.schemaVersion),
parsedAttr: false,
attrLen: 0,
}
bRecord.initVariable()
return bRecord
}
func newBinaryRecordForSerialize(version int, schema *RecordSchema) *binaryRecord {
bRecord := &binaryRecord{
schemaVersion: version,
schema: schema,
parsedAttr: true,
attrLen: 0,
}
bRecord.initVariable()
minAllocSize := bRecord.getMinAllocSize(bRecord.fieldCount)
bRecord.data = make([]byte, minAllocSize)
bRecord.nextOffset = minAllocSize
return bRecord
}
func (bRecord *binaryRecord) initVariable() {
if bRecord.schema != nil {
bRecord.fieldCount = bRecord.schema.Size()
} else {
bRecord.fieldCount = 1
}
bRecord.fieldStartOffset = bRecord.getFixHeaderLength(bRecord.fieldCount)
}
func (bRecord *binaryRecord) getFixHeaderLength(fieldCount int) int {
return binaryRecordHeaderSize + fieldCountSize + (((fieldCount + 63) >> 6) << 3)
}
func (bRecord *binaryRecord) getMinAllocSize(fieldCount int) int {
return bRecord.getFixHeaderLength(fieldCount) + fieldCount*byteSizePerField
}
func (bRecord *binaryRecord) getRecordSize() int {
return intByteSize + bRecord.attrLen + bRecord.nextOffset
}
func (bRecord *binaryRecord) addAttribute(key string, value string) {
if bRecord.attributes == nil {
bRecord.attributes = make(map[string]string)
}
bRecord.attributes[key] = value
bRecord.attrLen += len(key) + len(value) + intByteSize*2
}
func (bRecord *binaryRecord) getAttributes() map[string]string {
bRecord.parseAttributesIfNeed()
return bRecord.attributes
}
func (bRecord *binaryRecord) parseAttributesIfNeed() error {
if !bRecord.parsedAttr {
if bRecord.header == nil {
bRecord.header = &binaryRecordHeader{
encodeType: 0,
schemaVersion: int32(bRecord.schemaVersion),
totalSize: int32(bRecord.getRecordSize()),
attrOffset: int32(bRecord.nextOffset),
}
}
offset := bRecord.header.attrOffset
attrSize := binary.LittleEndian.Uint32(bRecord.data[offset:])
if attrSize != 0 && bRecord.attributes == nil {
bRecord.attributes = make(map[string]string, attrSize)
}
if uint32(len(bRecord.data)) < attrSize {
return fmt.Errorf("check data len failed")
}
offset = offset + 4
for i := uint32(0); i < attrSize; i = i + 1 {
keyLen := int32(binary.LittleEndian.Uint32(bRecord.data[offset:]))
offset = offset + 4
key := string(bRecord.data[offset : offset+keyLen])
offset = offset + keyLen
valueLen := int32(binary.LittleEndian.Uint32(bRecord.data[offset:]))
offset = offset + 4
value := string(bRecord.data[offset : offset+valueLen])
bRecord.attributes[key] = value
offset = offset + valueLen
}
bRecord.parsedAttr = true
}
return nil
}
func (bRecord *binaryRecord) getFieldOffset(index int) int {
return byteSizePerField*index + bRecord.fieldStartOffset
}
func (bRecord *binaryRecord) setNotNullAt(index int) {
nullOffset := binaryRecordHeaderSize + fieldCountSize + (index >> 3)
value := bRecord.data[nullOffset] | byte(uint32(1)<<uint32(index&7))
bRecord.data[nullOffset] = value
}
func (bRecord *binaryRecord) isFieldNull(index int) bool {
nullOffset := binaryRecordHeaderSize + fieldCountSize + (index >> 3)
value := bRecord.data[nullOffset] & byte(1<<uint32(index&7))
return value == 0
}
func (bRecord *binaryRecord) checkFieldIndex(index int) error {
if index >= bRecord.fieldCount {
return fmt.Errorf("filed index: %d exceed field num: %d", index, bRecord.fieldCount)
}
return nil
}
func (bRecord *binaryRecord) alignSize(size int) int {
return (size + 7) & (^7)
}
func (bRecord *binaryRecord) setField(index int, data interface{}) error {
if err := bRecord.checkFieldIndex(index); err != nil {
return err
}
if data == nil {
return nil
}
bRecord.setNotNullAt(index)
offset := bRecord.getFieldOffset(index)
if bRecord.schema != nil {
field := bRecord.schema.Fields[index]
switch field.Type {
case STRING:
str, ok := data.(String)
if !ok {
return fmt.Errorf("value type [%v] dismatch field type [STRING]", reflect.TypeOf(data))
}
if err := bRecord.writeStr(offset, []byte(str)); err != nil {
return err
}
case DECIMAL:
val, ok := data.(Decimal)
if !ok {
return fmt.Errorf("value type [%v] dismatch field type [DECIMAL]", reflect.TypeOf(data))
}
if err := bRecord.writeStr(offset, []byte(val.String())); err != nil {
return err
}
case BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, TIMESTAMP, FLOAT, DOUBLE:
val, err := bRecord.convertToUInt64(data)
if err != nil {
return err
}
binary.LittleEndian.PutUint64(bRecord.data[offset:], val)
default:
return fmt.Errorf("invalid field type [%v]", field.Type)
}
} else {
buf, ok := data.([]byte)
if !ok {
return fmt.Errorf("only support write byte[] for no schema")
}
if err := bRecord.writeStr(offset, buf); err != nil {
return err
}
}
return nil
}
func (bRecord *binaryRecord) getField(index int) (interface{}, error) {
if err := bRecord.checkFieldIndex(index); err != nil {
return nil, err
}
if bRecord.isFieldNull(index) {
return nil, nil
}
offset := bRecord.getFieldOffset(index)
if bRecord.schema == nil {
byteData := bRecord.readByte(offset)
return byteData, nil
}
field := bRecord.schema.Fields[index]
switch field.Type {
case STRING:
str := bRecord.readStr(offset)
return str, nil
case DECIMAL:
str := bRecord.readStr(offset)
return decimal.NewFromString(str)
case FLOAT:
bits := binary.LittleEndian.Uint32(bRecord.data[offset:])
return math.Float32frombits(bits), nil
case DOUBLE:
bits := binary.LittleEndian.Uint64(bRecord.data[offset:])
return math.Float64frombits(bits), nil
case BOOLEAN:
val := binary.LittleEndian.Uint64(bRecord.data[offset:])
return val == 1, nil
case TINYINT:
val := binary.LittleEndian.Uint64(bRecord.data[offset:])
return int8(val), nil
case SMALLINT:
val := binary.LittleEndian.Uint64(bRecord.data[offset:])
return int16(val), nil
case INTEGER:
val := binary.LittleEndian.Uint64(bRecord.data[offset:])
return int32(val), nil
case BIGINT, TIMESTAMP:
val := binary.LittleEndian.Uint64(bRecord.data[offset:])
return int64(val), nil
default:
return nil, fmt.Errorf("invalid field type [%v]", field.Type)
}
}
func (bRecord *binaryRecord) writeStr(offset int, data []byte) error {
length := len(data)
if length < 7 {
num := copy(bRecord.data[offset:], data)
for i := num; i < 7; i = i + 1 {
bRecord.data[offset+i] = byte(0)
}
bRecord.data[offset+7] = byte(0x80) | byte(length)
} else {
offsetAndSize := ((int64(bRecord.nextOffset - binaryRecordHeaderSize)) << 32) | int64(length)
binary.LittleEndian.PutUint64(bRecord.data[offset:], uint64(offsetAndSize))
buf := bytes.NewBuffer(bRecord.data)
buf.Write(data)
needSize := bRecord.alignSize(length)
padSize := needSize - length
if padSize > 0 {
for i := 0; i < padSize; i = i + 1 {
buf.WriteByte(0)
}
}
bRecord.data = buf.Bytes()
bRecord.nextOffset += needSize
}
return nil
}
func (bRecord *binaryRecord) readByte(offset int) []byte {
data := binary.LittleEndian.Uint64(bRecord.data[offset:])
isLittleStr := (data & (uint64(0x80) << 56)) != 0
if isLittleStr {
length := int((data >> 56) & 0x07)
return bRecord.data[offset : offset+length]
} else {
strOffset := binaryRecordHeaderSize + int(data>>32)
strLen := int(data & math.MaxUint32)
return bRecord.data[strOffset : strOffset+strLen]
}
}
func (bRecord *binaryRecord) readStr(offset int) string {
data := binary.LittleEndian.Uint64(bRecord.data[offset:])
isLittleStr := (data & (uint64(0x80) << 56)) != 0
if isLittleStr {
length := int((data >> 56) & 0x07)
return string(bRecord.data[offset : offset+length])
} else {
strOffset := binaryRecordHeaderSize + int(data>>32)
strLen := int(data & math.MaxUint32)
return string(bRecord.data[strOffset : strOffset+strLen])
}
}
// for TINYINT, SMALLINT, INTEGER, BIGINT, TIMESTAMP, FLOAT, DOUBLE
func (bRecord *binaryRecord) convertToUInt64(data interface{}) (uint64, error) {
var val uint64
switch v := data.(type) {
case Tinyint:
val = uint64(v)
case Smallint:
val = uint64(v)
case Integer:
val = uint64(v)
case Bigint:
val = uint64(v)
case Timestamp:
val = uint64(v)
case Float:
fVal := float32(v)
bits := math.Float32bits(fVal)
val = uint64(bits)
case Double:
fVal := float64(v)
val = math.Float64bits(fVal)
case Boolean:
if v {
val = uint64(1)
} else {
val = uint64(0)
}
default:
return 0, fmt.Errorf("value type[%T] not match field type", reflect.ValueOf(val))
}
return val, nil
}
type binaryRecordContextSerializer struct {
projectName string
topicName string
shardId string
schema *RecordSchema
schemaClient *schemaRegistryClient
}
func (serializer *binaryRecordContextSerializer) serializeRecordHeader(bHeader *binaryRecordHeader) []byte {
buf := make([]byte, binaryRecordHeaderSize)
binary.LittleEndian.PutUint32(buf, uint32(bHeader.encodeType))
binary.LittleEndian.PutUint32(buf[4:], uint32(bHeader.schemaVersion))
binary.LittleEndian.PutUint32(buf[8:], uint32(bHeader.totalSize))
binary.LittleEndian.PutUint32(buf[12:], uint32(bHeader.attrOffset))
return buf
}
// BinaryRecord => []byte
func (serializer *binaryRecordContextSerializer) serializeBinaryRecord(writer *bytes.Buffer, bRecord *binaryRecord) error {
if bRecord.header == nil {
bRecord.header = newBinaryRecordHeader(bRecord.schemaVersion, bRecord.getRecordSize(), bRecord.nextOffset)
}
headerBuf := serializer.serializeRecordHeader(bRecord.header)
copy(bRecord.data, headerBuf)
_, err := writer.Write(bRecord.data)
if err != nil {
return err
}
if err = binary.Write(writer, binary.LittleEndian, int32(len(bRecord.attributes))); err != nil {
return err
}
for key, val := range bRecord.attributes {
if err = binary.Write(writer, binary.LittleEndian, int32(len(key))); err != nil {
return err
}
if _, err = writer.WriteString(key); err != nil {
return err
}
if err = binary.Write(writer, binary.LittleEndian, int32(len(val))); err != nil {
return err
}
if _, err = writer.WriteString(val); err != nil {
return err
}
}
return nil
}
func (serializer *binaryRecordContextSerializer) deserializeRecordHeader(reader *bytes.Reader) (*binaryRecordHeader, error) {
if reader.Len() < binaryRecordHeaderSize {
return nil, fmt.Errorf("data length is not enough for BinaryRecordHeader(%d)", binaryRecordHeaderSize)
}
header := &binaryRecordHeader{}
if err := binary.Read(reader, binary.LittleEndian, &header.encodeType); err != nil {
return nil, err
}
if err := binary.Read(reader, binary.LittleEndian, &header.schemaVersion); err != nil {
return nil, err
}
if err := binary.Read(reader, binary.LittleEndian, &header.totalSize); err != nil {
return nil, err
}
if err := binary.Read(reader, binary.LittleEndian, &header.attrOffset); err != nil {
return nil, err
}
return header, nil
}
// []byte => BinaryRecord
func (serializer *binaryRecordContextSerializer) deserializeBinaryRecord(reader *bytes.Reader) (*binaryRecord, error) {
bHeader, err := serializer.deserializeRecordHeader(reader)
if err != nil {
return nil, err
}
// 读取header完成之后重置到读header之前的位点
if _, err := reader.Seek(-int64(binaryRecordHeaderSize), io.SeekCurrent); err != nil {
return nil, err
}
if reader.Len() < int(bHeader.totalSize) {
return nil, fmt.Errorf("check record header length fail, need: %d, real: %d", bHeader.totalSize, reader.Len())
}
var schema *RecordSchema = nil
if bHeader.schemaVersion != -1 {
if serializer.schemaClient != nil {
schema, err = serializer.getSchemeByVersion(int(bHeader.schemaVersion))
if err != nil {
return nil, err
}
} else {
schema = serializer.schema
}
}
buf := make([]byte, bHeader.totalSize)
if _, err = reader.Read(buf); err != nil {
return nil, err
}
return newBinaryRecordForDeserialize(buf, bHeader, schema), nil
}
func (serializer *binaryRecordContextSerializer) getSchemeByVersion(version int) (*RecordSchema, error) {
if serializer.schemaClient != nil {
return serializer.schemaClient.getSchemaByVersion(serializer.projectName, serializer.topicName, version)
}
return nil, nil
}
func (serializer *binaryRecordContextSerializer) getVersionBySchema(schema *RecordSchema) (int, error) {
if serializer.schemaClient != nil {
return serializer.schemaClient.getVersionBySchema(serializer.projectName, serializer.topicName, schema)
}
return 0, nil
}
func (serializer *binaryRecordContextSerializer) blob2BinaryRecord(record *BlobRecord) (*binaryRecord, error) {
bRecord := newBinaryRecordForSerialize(-1, nil)
if err := bRecord.setField(0, record.RawData); err != nil {
return nil, err
}
return bRecord, nil
}
func (serializer *binaryRecordContextSerializer) tuple2BinaryRecord(record *TupleRecord) (*binaryRecord, error) {
version := 0
if serializer.schemaClient != nil {
val, err := serializer.getVersionBySchema(record.RecordSchema)
if err != nil {
return nil, err
}
version = val
}
bRecord := newBinaryRecordForSerialize(version, record.RecordSchema)
for idx, val := range record.Values {
if err := bRecord.setField(idx, val); err != nil {
return nil, err
}
}
return bRecord, nil
}
func (serializer *binaryRecordContextSerializer) dhRecord2BinaryRecord(record IRecord) (*binaryRecord, error) {
var err error
var bRecord *binaryRecord = nil
switch val := record.(type) {
case *TupleRecord:
bRecord, err = serializer.tuple2BinaryRecord(val)
if err != nil {
return nil, err
}
case *BlobRecord:
bRecord, err = serializer.blob2BinaryRecord(val)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("invalid record type %v", reflect.TypeOf(record))
}
attributes := record.GetAttributes()
for key, val := range attributes {
strVal, ok := val.(string)
if !ok {
return nil, fmt.Errorf("attribute only support map[string]string now")
}
bRecord.addAttribute(key, strVal)
}
return bRecord, nil
}
func (serializer *binaryRecordContextSerializer) binaryRecord2DhRecord(bRecord *binaryRecord, meta *respMeta, schema *RecordSchema) (IRecord, error) {
var record IRecord
var err error
if schema != nil {
record, err = serializer.binary2TupleRecord(bRecord, schema)
if err != nil {
return nil, err
}
} else {
record, err = serializer.binary2BlobRecord(bRecord)
if err != nil {
return nil, err
}
}
baseRecord := BaseRecord{
ShardId: serializer.shardId,
SystemTime: meta.systemTime,
Sequence: meta.sequence,
Cursor: meta.cursor,
NextCursor: meta.nextCursor,
Serial: meta.serial,
}
attributes := bRecord.getAttributes()
for key, val := range attributes {
baseRecord.SetAttribute(key, val)
}
record.SetBaseRecord(baseRecord)
return record, nil
}
func (serializer *binaryRecordContextSerializer) binary2TupleRecord(bRecord *binaryRecord, schema *RecordSchema) (*TupleRecord, error) {
record := NewTupleRecord(schema, 0)
for idx := 0; idx < schema.Size(); idx = idx + 1 {
val, err := bRecord.getField(idx)
if err != nil {
return nil, err
}
if val != nil {
record.SetValueByIdx(idx, val)
}
}
return record, nil
}
func (serializer *binaryRecordContextSerializer) binary2BlobRecord(bRecord *binaryRecord) (*BlobRecord, error) {
val, err := bRecord.getField(0)
if err != nil {
return nil, err
}
data, ok := val.([]byte)
if !ok {
return nil, fmt.Errorf("only support write byte[] for no schema")
}
return NewBlobRecord(data, 0), nil
}