datahub/batchrecord.go (222 lines of code) (raw):
package datahub
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
)
const (
batchRecordHeaderSize int = 26
)
var (
batchMagicBytes = []byte{'D', 'H', 'U', 'B'}
batchMagicNum = int(binary.LittleEndian.Uint32(batchMagicBytes))
)
func calculateCrc32(buf []byte) uint32 {
table := crc32.MakeTable(crc32.Castagnoli)
return crc32.Checksum(buf, table)
}
type respMeta struct {
cursor string
nextCursor string
sequence int64
systemTime int64
serial int64
}
type batchRecordHeader struct {
magic int
version int
length int
rawSize int
crc32 uint32
attributes int16
recordCount int
}
type batchRecord struct {
header *batchRecordHeader
records []*binaryRecord
}
type batchSerializer struct {
cType CompressorType
schemaClient *schemaRegistryClient
bSerializer *binaryRecordContextSerializer
}
func newBatchSerializer(project, topic string, cType CompressorType, schemaClient *schemaRegistryClient) *batchSerializer {
tmpSerializer := &binaryRecordContextSerializer{
projectName: project,
topicName: topic,
schemaClient: schemaClient,
}
return &batchSerializer{
cType: cType,
schemaClient: schemaClient,
bSerializer: tmpSerializer,
}
}
func (serializer *batchSerializer) serialize(records []IRecord) ([]byte, error) {
batch, err := serializer.parseBatchRecord(records)
if err != nil {
return nil, err
}
return serializer.serializeBatchRecord(batch)
}
// dh record list => batch record
func (serializer *batchSerializer) parseBatchRecord(records []IRecord) (*batchRecord, error) {
batch := &batchRecord{
records: make([]*binaryRecord, 0, len(records)),
}
for _, record := range records {
bRecord, err := serializer.bSerializer.dhRecord2BinaryRecord(record)
if err != nil {
return nil, err
}
batch.records = append(batch.records, bRecord)
}
return batch, nil
}
func (serializer *batchSerializer) serializeBatchHeader(bHeader *batchRecordHeader) []byte {
buf := make([]byte, batchRecordHeaderSize)
copy(buf, batchMagicBytes)
offset := len(batchMagicBytes)
binary.LittleEndian.PutUint32(buf[offset:], uint32(bHeader.version))
binary.LittleEndian.PutUint32(buf[offset+4:], uint32(bHeader.length))
binary.LittleEndian.PutUint32(buf[offset+8:], uint32(bHeader.rawSize))
binary.LittleEndian.PutUint32(buf[offset+12:], uint32(bHeader.crc32))
binary.LittleEndian.PutUint16(buf[offset+16:], uint16(bHeader.attributes))
binary.LittleEndian.PutUint32(buf[offset+18:], uint32(bHeader.recordCount))
return buf
}
func (serializer *batchSerializer) serializeBatchRecord(batch *batchRecord) ([]byte, error) {
calSize := batchRecordHeaderSize
for _, bRecord := range batch.records {
calSize += bRecord.getRecordSize()
}
writer := &bytes.Buffer{}
writer.Grow(calSize)
writer.Write(make([]byte, batchRecordHeaderSize))
for _, bRecord := range batch.records {
if err := serializer.bSerializer.serializeBinaryRecord(writer, bRecord); err != nil {
return nil, err
}
}
data := writer.Bytes()
if batch.header == nil {
batch.header = &batchRecordHeader{}
}
batch.header.magic = int(batchMagicNum)
batch.header.version = 0
batch.header.rawSize = len(data) - batchRecordHeaderSize
batch.header.length = len(data)
batch.header.attributes = int16(serializer.cType.toValue() & 3)
batch.header.recordCount = len(batch.records)
data, err := serializer.compressIfNeed(data, batch)
if err != nil {
return nil, err
}
batch.header.crc32 = calculateCrc32(data[batchRecordHeaderSize:])
copy(data, serializer.serializeBatchHeader(batch.header))
return data, nil
}
func (serializer *batchSerializer) compressIfNeed(data []byte, batch *batchRecord) ([]byte, error) {
buf := data
compressor := getCompressor(serializer.cType)
if compressor != nil {
cBuf, err := compressor.Compress(data[batchRecordHeaderSize:])
if err != nil {
return nil, err
}
buf = append(data[0:batchRecordHeaderSize], cBuf...)
batch.header.length = len(buf)
}
return buf, nil
}
type batchDeserializer struct {
schemaClient *schemaRegistryClient
bSerializer *binaryRecordContextSerializer
}
func newBatchDeserializer(project, topic, shardId string, schema *RecordSchema, schemaClient *schemaRegistryClient) *batchDeserializer {
tmpSerializer := &binaryRecordContextSerializer{
projectName: project,
topicName: topic,
shardId: shardId,
schema: schema,
schemaClient: schemaClient,
}
return &batchDeserializer{
schemaClient: schemaClient,
bSerializer: tmpSerializer,
}
}
func (deserializer *batchDeserializer) deserialize(data []byte, meta *respMeta) ([]IRecord, error) {
batch, err := deserializer.parseBatchRecord(data)
if err != nil {
return nil, err
}
return deserializer.deserializeBatchRecord(batch, meta)
}
func (deserializer *batchDeserializer) deserializeBatchHeader(data []byte) (*batchRecordHeader, error) {
if len(data) < batchRecordHeaderSize {
return nil, fmt.Errorf("read batch header fail")
}
header := &batchRecordHeader{}
header.magic = int(binary.LittleEndian.Uint32(data[0:]))
header.version = int(binary.LittleEndian.Uint32(data[4:]))
header.length = int(binary.LittleEndian.Uint32(data[8:]))
header.rawSize = int(binary.LittleEndian.Uint32(data[12:]))
header.crc32 = binary.LittleEndian.Uint32(data[16:])
header.attributes = int16(binary.LittleEndian.Uint16(data[20:]))
header.recordCount = int(binary.LittleEndian.Uint32(data[22:]))
if header.magic != batchMagicNum {
return nil, fmt.Errorf("check magic number fail")
}
if header.length != len(data) {
return nil, fmt.Errorf("check payload length fail")
}
if header.crc32 != 0 {
calCrc := calculateCrc32(data[batchRecordHeaderSize:header.length])
if calCrc != header.crc32 {
return nil, fmt.Errorf("check crc fail. expect:%d, real:%d", header.crc32, calCrc)
}
}
return header, nil
}
// []byte => batch record
func (deserializer *batchDeserializer) parseBatchRecord(data []byte) (*batchRecord, error) {
batchHeader, err := deserializer.deserializeBatchHeader(data)
if err != nil {
return nil, err
}
// 跳过batch header的部分
data = data[batchRecordHeaderSize:]
rawBuf, err := deserializer.decompressIfNeed(batchHeader, data)
if err != nil {
return nil, err
}
batch := &batchRecord{}
reader := bytes.NewReader(rawBuf)
batch.records = make([]*binaryRecord, 0, batchHeader.recordCount)
for idx := 0; idx < batchHeader.recordCount; idx = idx + 1 {
bRecord, err := deserializer.bSerializer.deserializeBinaryRecord(reader)
if err != nil {
return nil, err
}
batch.records = append(batch.records, bRecord)
}
return batch, nil
}
func (deserializer *batchDeserializer) deserializeBatchRecord(batch *batchRecord, meta *respMeta) ([]IRecord, error) {
recordList := make([]IRecord, 0, len(batch.records))
for _, bRecord := range batch.records {
record, err := deserializer.bSerializer.binaryRecord2DhRecord(bRecord, meta, bRecord.schema)
if err != nil {
return nil, err
}
recordList = append(recordList, record)
}
return recordList, nil
}
func (deserializer *batchDeserializer) decompressIfNeed(header *batchRecordHeader, data []byte) ([]byte, error) {
cType := getCompressTypeFromValue(int(header.attributes) & 3)
compressor := getCompressor(cType)
if compressor != nil {
return compressor.DeCompress(data, int64(header.rawSize))
}
return data, nil
}