datahub/binaryrecord.go (531 lines of code) (raw):

package datahub import ( "bytes" "encoding/binary" "fmt" "io" "math" "reflect" "github.com/shopspring/decimal" ) const ( fieldCountSize int = 4 intByteSize int = 4 byteSizePerField int = 8 binaryRecordHeaderSize int = 16 ) type binaryRecordHeader struct { encodeType int32 schemaVersion int32 totalSize int32 attrOffset int32 } func newBinaryRecordHeader(schemaVersion, recordSize, nextOffset int) *binaryRecordHeader { return &binaryRecordHeader{ encodeType: 1, schemaVersion: int32(schemaVersion), totalSize: int32(recordSize), attrOffset: int32(nextOffset), } } type binaryRecord struct { parsedAttr bool fieldStartOffset int nextOffset int fieldCount int attrLen int schemaVersion int schema *RecordSchema header *binaryRecordHeader data []byte attributes map[string]string } func newBinaryRecordForDeserialize(buf []byte, header *binaryRecordHeader, schema *RecordSchema) *binaryRecord { bRecord := &binaryRecord{ data: buf, header: header, schema: schema, schemaVersion: int(header.schemaVersion), parsedAttr: false, attrLen: 0, } bRecord.initVariable() return bRecord } func newBinaryRecordForSerialize(version int, schema *RecordSchema) *binaryRecord { bRecord := &binaryRecord{ schemaVersion: version, schema: schema, parsedAttr: true, attrLen: 0, } bRecord.initVariable() minAllocSize := bRecord.getMinAllocSize(bRecord.fieldCount) bRecord.data = make([]byte, minAllocSize) bRecord.nextOffset = minAllocSize return bRecord } func (bRecord *binaryRecord) initVariable() { if bRecord.schema != nil { bRecord.fieldCount = bRecord.schema.Size() } else { bRecord.fieldCount = 1 } bRecord.fieldStartOffset = bRecord.getFixHeaderLength(bRecord.fieldCount) } func (bRecord *binaryRecord) getFixHeaderLength(fieldCount int) int { return binaryRecordHeaderSize + fieldCountSize + (((fieldCount + 63) >> 6) << 3) } func (bRecord *binaryRecord) getMinAllocSize(fieldCount int) int { return bRecord.getFixHeaderLength(fieldCount) + fieldCount*byteSizePerField } func (bRecord *binaryRecord) getRecordSize() int { return intByteSize + bRecord.attrLen + bRecord.nextOffset } func (bRecord *binaryRecord) addAttribute(key string, value string) { if bRecord.attributes == nil { bRecord.attributes = make(map[string]string) } bRecord.attributes[key] = value bRecord.attrLen += len(key) + len(value) + intByteSize*2 } func (bRecord *binaryRecord) getAttributes() map[string]string { bRecord.parseAttributesIfNeed() return bRecord.attributes } func (bRecord *binaryRecord) parseAttributesIfNeed() error { if !bRecord.parsedAttr { if bRecord.header == nil { bRecord.header = &binaryRecordHeader{ encodeType: 0, schemaVersion: int32(bRecord.schemaVersion), totalSize: int32(bRecord.getRecordSize()), attrOffset: int32(bRecord.nextOffset), } } offset := bRecord.header.attrOffset attrSize := binary.LittleEndian.Uint32(bRecord.data[offset:]) if attrSize != 0 && bRecord.attributes == nil { bRecord.attributes = make(map[string]string, attrSize) } if uint32(len(bRecord.data)) < attrSize { return fmt.Errorf("check data len failed") } offset = offset + 4 for i := uint32(0); i < attrSize; i = i + 1 { keyLen := int32(binary.LittleEndian.Uint32(bRecord.data[offset:])) offset = offset + 4 key := string(bRecord.data[offset : offset+keyLen]) offset = offset + keyLen valueLen := int32(binary.LittleEndian.Uint32(bRecord.data[offset:])) offset = offset + 4 value := string(bRecord.data[offset : offset+valueLen]) bRecord.attributes[key] = value offset = offset + valueLen } bRecord.parsedAttr = true } return nil } func (bRecord *binaryRecord) getFieldOffset(index int) int { return byteSizePerField*index + bRecord.fieldStartOffset } func (bRecord *binaryRecord) setNotNullAt(index int) { nullOffset := binaryRecordHeaderSize + fieldCountSize + (index >> 3) value := bRecord.data[nullOffset] | byte(uint32(1)<<uint32(index&7)) bRecord.data[nullOffset] = value } func (bRecord *binaryRecord) isFieldNull(index int) bool { nullOffset := binaryRecordHeaderSize + fieldCountSize + (index >> 3) value := bRecord.data[nullOffset] & byte(1<<uint32(index&7)) return value == 0 } func (bRecord *binaryRecord) checkFieldIndex(index int) error { if index >= bRecord.fieldCount { return fmt.Errorf("filed index: %d exceed field num: %d", index, bRecord.fieldCount) } return nil } func (bRecord *binaryRecord) alignSize(size int) int { return (size + 7) & (^7) } func (bRecord *binaryRecord) setField(index int, data interface{}) error { if err := bRecord.checkFieldIndex(index); err != nil { return err } if data == nil { return nil } bRecord.setNotNullAt(index) offset := bRecord.getFieldOffset(index) if bRecord.schema != nil { field := bRecord.schema.Fields[index] switch field.Type { case STRING: str, ok := data.(String) if !ok { return fmt.Errorf("value type [%v] dismatch field type [STRING]", reflect.TypeOf(data)) } if err := bRecord.writeStr(offset, []byte(str)); err != nil { return err } case DECIMAL: val, ok := data.(Decimal) if !ok { return fmt.Errorf("value type [%v] dismatch field type [DECIMAL]", reflect.TypeOf(data)) } if err := bRecord.writeStr(offset, []byte(val.String())); err != nil { return err } case BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, TIMESTAMP, FLOAT, DOUBLE: val, err := bRecord.convertToUInt64(data) if err != nil { return err } binary.LittleEndian.PutUint64(bRecord.data[offset:], val) default: return fmt.Errorf("invalid field type [%v]", field.Type) } } else { buf, ok := data.([]byte) if !ok { return fmt.Errorf("only support write byte[] for no schema") } if err := bRecord.writeStr(offset, buf); err != nil { return err } } return nil } func (bRecord *binaryRecord) getField(index int) (interface{}, error) { if err := bRecord.checkFieldIndex(index); err != nil { return nil, err } if bRecord.isFieldNull(index) { return nil, nil } offset := bRecord.getFieldOffset(index) if bRecord.schema == nil { byteData := bRecord.readByte(offset) return byteData, nil } field := bRecord.schema.Fields[index] switch field.Type { case STRING: str := bRecord.readStr(offset) return str, nil case DECIMAL: str := bRecord.readStr(offset) return decimal.NewFromString(str) case FLOAT: bits := binary.LittleEndian.Uint32(bRecord.data[offset:]) return math.Float32frombits(bits), nil case DOUBLE: bits := binary.LittleEndian.Uint64(bRecord.data[offset:]) return math.Float64frombits(bits), nil case BOOLEAN: val := binary.LittleEndian.Uint64(bRecord.data[offset:]) return val == 1, nil case TINYINT: val := binary.LittleEndian.Uint64(bRecord.data[offset:]) return int8(val), nil case SMALLINT: val := binary.LittleEndian.Uint64(bRecord.data[offset:]) return int16(val), nil case INTEGER: val := binary.LittleEndian.Uint64(bRecord.data[offset:]) return int32(val), nil case BIGINT, TIMESTAMP: val := binary.LittleEndian.Uint64(bRecord.data[offset:]) return int64(val), nil default: return nil, fmt.Errorf("invalid field type [%v]", field.Type) } } func (bRecord *binaryRecord) writeStr(offset int, data []byte) error { length := len(data) if length < 7 { num := copy(bRecord.data[offset:], data) for i := num; i < 7; i = i + 1 { bRecord.data[offset+i] = byte(0) } bRecord.data[offset+7] = byte(0x80) | byte(length) } else { offsetAndSize := ((int64(bRecord.nextOffset - binaryRecordHeaderSize)) << 32) | int64(length) binary.LittleEndian.PutUint64(bRecord.data[offset:], uint64(offsetAndSize)) buf := bytes.NewBuffer(bRecord.data) buf.Write(data) needSize := bRecord.alignSize(length) padSize := needSize - length if padSize > 0 { for i := 0; i < padSize; i = i + 1 { buf.WriteByte(0) } } bRecord.data = buf.Bytes() bRecord.nextOffset += needSize } return nil } func (bRecord *binaryRecord) readByte(offset int) []byte { data := binary.LittleEndian.Uint64(bRecord.data[offset:]) isLittleStr := (data & (uint64(0x80) << 56)) != 0 if isLittleStr { length := int((data >> 56) & 0x07) return bRecord.data[offset : offset+length] } else { strOffset := binaryRecordHeaderSize + int(data>>32) strLen := int(data & math.MaxUint32) return bRecord.data[strOffset : strOffset+strLen] } } func (bRecord *binaryRecord) readStr(offset int) string { data := binary.LittleEndian.Uint64(bRecord.data[offset:]) isLittleStr := (data & (uint64(0x80) << 56)) != 0 if isLittleStr { length := int((data >> 56) & 0x07) return string(bRecord.data[offset : offset+length]) } else { strOffset := binaryRecordHeaderSize + int(data>>32) strLen := int(data & math.MaxUint32) return string(bRecord.data[strOffset : strOffset+strLen]) } } // for TINYINT, SMALLINT, INTEGER, BIGINT, TIMESTAMP, FLOAT, DOUBLE func (bRecord *binaryRecord) convertToUInt64(data interface{}) (uint64, error) { var val uint64 switch v := data.(type) { case Tinyint: val = uint64(v) case Smallint: val = uint64(v) case Integer: val = uint64(v) case Bigint: val = uint64(v) case Timestamp: val = uint64(v) case Float: fVal := float32(v) bits := math.Float32bits(fVal) val = uint64(bits) case Double: fVal := float64(v) val = math.Float64bits(fVal) case Boolean: if v { val = uint64(1) } else { val = uint64(0) } default: return 0, fmt.Errorf("value type[%T] not match field type", reflect.ValueOf(val)) } return val, nil } type binaryRecordContextSerializer struct { projectName string topicName string shardId string schema *RecordSchema schemaClient *schemaRegistryClient } func (serializer *binaryRecordContextSerializer) serializeRecordHeader(bHeader *binaryRecordHeader) []byte { buf := make([]byte, binaryRecordHeaderSize) binary.LittleEndian.PutUint32(buf, uint32(bHeader.encodeType)) binary.LittleEndian.PutUint32(buf[4:], uint32(bHeader.schemaVersion)) binary.LittleEndian.PutUint32(buf[8:], uint32(bHeader.totalSize)) binary.LittleEndian.PutUint32(buf[12:], uint32(bHeader.attrOffset)) return buf } // BinaryRecord => []byte func (serializer *binaryRecordContextSerializer) serializeBinaryRecord(writer *bytes.Buffer, bRecord *binaryRecord) error { if bRecord.header == nil { bRecord.header = newBinaryRecordHeader(bRecord.schemaVersion, bRecord.getRecordSize(), bRecord.nextOffset) } headerBuf := serializer.serializeRecordHeader(bRecord.header) copy(bRecord.data, headerBuf) _, err := writer.Write(bRecord.data) if err != nil { return err } if err = binary.Write(writer, binary.LittleEndian, int32(len(bRecord.attributes))); err != nil { return err } for key, val := range bRecord.attributes { if err = binary.Write(writer, binary.LittleEndian, int32(len(key))); err != nil { return err } if _, err = writer.WriteString(key); err != nil { return err } if err = binary.Write(writer, binary.LittleEndian, int32(len(val))); err != nil { return err } if _, err = writer.WriteString(val); err != nil { return err } } return nil } func (serializer *binaryRecordContextSerializer) deserializeRecordHeader(reader *bytes.Reader) (*binaryRecordHeader, error) { if reader.Len() < binaryRecordHeaderSize { return nil, fmt.Errorf("data length is not enough for BinaryRecordHeader(%d)", binaryRecordHeaderSize) } header := &binaryRecordHeader{} if err := binary.Read(reader, binary.LittleEndian, &header.encodeType); err != nil { return nil, err } if err := binary.Read(reader, binary.LittleEndian, &header.schemaVersion); err != nil { return nil, err } if err := binary.Read(reader, binary.LittleEndian, &header.totalSize); err != nil { return nil, err } if err := binary.Read(reader, binary.LittleEndian, &header.attrOffset); err != nil { return nil, err } return header, nil } // []byte => BinaryRecord func (serializer *binaryRecordContextSerializer) deserializeBinaryRecord(reader *bytes.Reader) (*binaryRecord, error) { bHeader, err := serializer.deserializeRecordHeader(reader) if err != nil { return nil, err } // 读取header完成之后重置到读header之前的位点 if _, err := reader.Seek(-int64(binaryRecordHeaderSize), io.SeekCurrent); err != nil { return nil, err } if reader.Len() < int(bHeader.totalSize) { return nil, fmt.Errorf("check record header length fail, need: %d, real: %d", bHeader.totalSize, reader.Len()) } var schema *RecordSchema = nil if bHeader.schemaVersion != -1 { if serializer.schemaClient != nil { schema, err = serializer.getSchemeByVersion(int(bHeader.schemaVersion)) if err != nil { return nil, err } } else { schema = serializer.schema } } buf := make([]byte, bHeader.totalSize) if _, err = reader.Read(buf); err != nil { return nil, err } return newBinaryRecordForDeserialize(buf, bHeader, schema), nil } func (serializer *binaryRecordContextSerializer) getSchemeByVersion(version int) (*RecordSchema, error) { if serializer.schemaClient != nil { return serializer.schemaClient.getSchemaByVersion(serializer.projectName, serializer.topicName, version) } return nil, nil } func (serializer *binaryRecordContextSerializer) getVersionBySchema(schema *RecordSchema) (int, error) { if serializer.schemaClient != nil { return serializer.schemaClient.getVersionBySchema(serializer.projectName, serializer.topicName, schema) } return 0, nil } func (serializer *binaryRecordContextSerializer) blob2BinaryRecord(record *BlobRecord) (*binaryRecord, error) { bRecord := newBinaryRecordForSerialize(-1, nil) if err := bRecord.setField(0, record.RawData); err != nil { return nil, err } return bRecord, nil } func (serializer *binaryRecordContextSerializer) tuple2BinaryRecord(record *TupleRecord) (*binaryRecord, error) { version := 0 if serializer.schemaClient != nil { val, err := serializer.getVersionBySchema(record.RecordSchema) if err != nil { return nil, err } version = val } bRecord := newBinaryRecordForSerialize(version, record.RecordSchema) for idx, val := range record.Values { if err := bRecord.setField(idx, val); err != nil { return nil, err } } return bRecord, nil } func (serializer *binaryRecordContextSerializer) dhRecord2BinaryRecord(record IRecord) (*binaryRecord, error) { var err error var bRecord *binaryRecord = nil switch val := record.(type) { case *TupleRecord: bRecord, err = serializer.tuple2BinaryRecord(val) if err != nil { return nil, err } case *BlobRecord: bRecord, err = serializer.blob2BinaryRecord(val) if err != nil { return nil, err } default: return nil, fmt.Errorf("invalid record type %v", reflect.TypeOf(record)) } attributes := record.GetAttributes() for key, val := range attributes { strVal, ok := val.(string) if !ok { return nil, fmt.Errorf("attribute only support map[string]string now") } bRecord.addAttribute(key, strVal) } return bRecord, nil } func (serializer *binaryRecordContextSerializer) binaryRecord2DhRecord(bRecord *binaryRecord, meta *respMeta, schema *RecordSchema) (IRecord, error) { var record IRecord var err error if schema != nil { record, err = serializer.binary2TupleRecord(bRecord, schema) if err != nil { return nil, err } } else { record, err = serializer.binary2BlobRecord(bRecord) if err != nil { return nil, err } } baseRecord := BaseRecord{ ShardId: serializer.shardId, SystemTime: meta.systemTime, Sequence: meta.sequence, Cursor: meta.cursor, NextCursor: meta.nextCursor, Serial: meta.serial, } attributes := bRecord.getAttributes() for key, val := range attributes { baseRecord.SetAttribute(key, val) } record.SetBaseRecord(baseRecord) return record, nil } func (serializer *binaryRecordContextSerializer) binary2TupleRecord(bRecord *binaryRecord, schema *RecordSchema) (*TupleRecord, error) { record := NewTupleRecord(schema, 0) for idx := 0; idx < schema.Size(); idx = idx + 1 { val, err := bRecord.getField(idx) if err != nil { return nil, err } if val != nil { record.SetValueByIdx(idx, val) } } return record, nil } func (serializer *binaryRecordContextSerializer) binary2BlobRecord(bRecord *binaryRecord) (*BlobRecord, error) { val, err := bRecord.getField(0) if err != nil { return nil, err } data, ok := val.([]byte) if !ok { return nil, fmt.Errorf("only support write byte[] for no schema") } return NewBlobRecord(data, 0), nil }