pkg/pb/v1/write.go (178 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package v1 import ( "bytes" "encoding/hex" "time" "github.com/pkg/errors" "google.golang.org/protobuf/proto" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" ) const fieldFlagLength = 9 var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 0}}} var ( // TagFlag is a flag suffix to identify the encoding method. TagFlag = make([]byte, fieldFlagLength) strDelimiter = []byte("\n") nullTag = &modelv1.TagValue{Value: &modelv1.TagValue_Null{}} nullTagValue = TagValue{} errUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value") errMalformedElement = errors.New("element is malformed") errMalformedField = errors.New("field is malformed") ) // MarshalTagValue encodes modelv1.TagValue to bytes. func MarshalTagValue(tagValue *modelv1.TagValue) ([]byte, error) { fv, err := ParseTagValue(tagValue) if err != nil { return nil, err } val := fv.GetValue() if val != nil { return val, nil } return fv.marshalArr(), nil } // TagValue seels single value and array value. type TagValue struct { value []byte arr [][]byte splitter []byte } // GetValue returns the single value. func (fv TagValue) GetValue() []byte { if len(fv.value) < 1 { return nil } return fv.value } // GetArr returns the array value. func (fv TagValue) GetArr() [][]byte { if len(fv.arr) < 1 { return nil } return fv.arr } func newValue(value []byte) TagValue { return TagValue{ value: value, } } func newValueWithSplitter(splitter []byte) *TagValue { return &TagValue{ splitter: splitter, } } func appendValue(fv *TagValue, value []byte) *TagValue { if fv == nil { fv = &TagValue{} } fv.arr = append(fv.arr, value) return fv } func (fv *TagValue) marshalArr() []byte { switch len(fv.arr) { case 0: return nil case 1: return fv.arr[0] } n := len(fv.splitter) * (len(fv.arr) - 1) for i := 0; i < len(fv.arr); i++ { n += len(fv.arr[i]) } buf := bytes.NewBuffer(nil) buf.Grow(n) buf.Write(fv.arr[0]) for _, v := range fv.arr[1:] { if fv.splitter != nil { buf.Write(fv.splitter) } buf.Write(v) } return buf.Bytes() } // ParseTagValue decodes modelv1.TagValue to TagValue. func ParseTagValue(tagValue *modelv1.TagValue) (TagValue, error) { switch x := tagValue.GetValue().(type) { case *modelv1.TagValue_Null: return nullTagValue, nil case *modelv1.TagValue_Str: return newValue([]byte(x.Str.GetValue())), nil case *modelv1.TagValue_Int: return newValue(convert.Int64ToBytes(x.Int.GetValue())), nil case *modelv1.TagValue_StrArray: fv := newValueWithSplitter(strDelimiter) for _, v := range x.StrArray.GetValue() { fv = appendValue(fv, []byte(v)) } return *fv, nil case *modelv1.TagValue_IntArray: var fv *TagValue for _, i := range x.IntArray.GetValue() { fv = appendValue(fv, convert.Int64ToBytes(i)) } return *fv, nil case *modelv1.TagValue_BinaryData: return newValue(bytes.Clone(x.BinaryData)), nil } return TagValue{}, errUnsupportedTagForIndexField } // EncodeFamily encodes a tag family to bytes by referring to its specification. func EncodeFamily(familySpec *databasev1.TagFamilySpec, family *modelv1.TagFamilyForWrite) ([]byte, error) { if len(family.GetTags()) > len(familySpec.GetTags()) { return nil, errors.Wrap(errMalformedElement, "tag number is more than expected") } data := &modelv1.TagFamilyForWrite{} for ti, tag := range family.GetTags() { tagSpec := familySpec.GetTags()[ti] tType, isNull := tagValueTypeConv(tag) if !isNull && tType != tagSpec.GetType() { return nil, errors.Wrapf(errMalformedElement, "tag %s type is unexpected", tagSpec.GetName()) } if tagSpec.IndexedOnly { data.Tags = append(data.Tags, nullTag) } else { data.Tags = append(data.Tags, tag) } } return proto.Marshal(data) } // DecodeFieldValue decodes bytes to field value based on its specification. func DecodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) (*modelv1.FieldValue, error) { switch fieldSpec.GetFieldType() { case databasev1.FieldType_FIELD_TYPE_STRING: return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: string(fieldValue)}}}, nil case databasev1.FieldType_FIELD_TYPE_INT: if len(fieldValue) == 0 { return zeroFieldValue, nil } if len(fieldValue) != 8 { return nil, errors.WithMessagef(errMalformedField, "the length of encoded field value(int64) %s is %d, less than 8", hex.EncodeToString(fieldValue), len(fieldValue)) } return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: convert.BytesToInt64(fieldValue)}}}, nil case databasev1.FieldType_FIELD_TYPE_FLOAT: if len(fieldValue) == 0 { return zeroFieldValue, nil } if len(fieldValue) != 8 { return nil, errors.WithMessagef(errMalformedField, "the length of encoded field value(float64) %s is %d, less than 8", hex.EncodeToString(fieldValue), len(fieldValue)) } return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: convert.BytesToFloat64(fieldValue)}}}, nil case databasev1.FieldType_FIELD_TYPE_DATA_BINARY: return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: fieldValue}}, nil } return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}, nil } // EncoderFieldFlag encodes the encoding method, compression method, and interval into bytes. func EncoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) []byte { encodingMethod := byte(fieldSpec.GetEncodingMethod().Number()) compressionMethod := byte(fieldSpec.GetCompressionMethod().Number()) bb := make([]byte, fieldFlagLength) bb[0] = encodingMethod<<4 | compressionMethod copy(bb[1:], convert.Int64ToBytes(int64(interval))) return bb } // DecodeFieldFlag decodes the encoding method, compression method, and interval from bytes. func DecodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) { if len(key) < fieldFlagLength { return nil, 0, errors.WithMessagef(errMalformedField, "flag %s is invalid", hex.EncodeToString(key)) } b := key[len(key)-9:] return &databasev1.FieldSpec{ EncodingMethod: databasev1.EncodingMethod(int32(b[0]) >> 4), CompressionMethod: databasev1.CompressionMethod(int32(b[0] & 0x0F)), }, time.Duration(convert.BytesToInt64(b[1:])), nil }