banyand/stream/block_metadata.go (271 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package stream
import (
"errors"
"fmt"
"sort"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
)
type dataBlock struct {
offset uint64
size uint64
}
func (d *dataBlock) reset() {
d.offset = 0
d.size = 0
}
func (d *dataBlock) copyFrom(src *dataBlock) {
d.offset = src.offset
d.size = src.size
}
func (d *dataBlock) marshal(dst []byte) []byte {
dst = encoding.VarUint64ToBytes(dst, d.offset)
dst = encoding.VarUint64ToBytes(dst, d.size)
return dst
}
func (d *dataBlock) unmarshal(src []byte) []byte {
src, n := encoding.BytesToVarUint64(src)
d.offset = n
src, n = encoding.BytesToVarUint64(src)
d.size = n
return src
}
type blockMetadata struct {
tagFamilies map[string]*dataBlock
tagProjection []model.TagProjection
timestamps timestampsMetadata
elementIDs elementIDsMetadata
seriesID common.SeriesID
uncompressedSizeBytes uint64
count uint64
}
func (bm *blockMetadata) copyFrom(src *blockMetadata) {
bm.seriesID = src.seriesID
bm.uncompressedSizeBytes = src.uncompressedSizeBytes
bm.count = src.count
bm.timestamps.copyFrom(&src.timestamps)
bm.elementIDs.copyFrom(&src.elementIDs)
for k, db := range src.tagFamilies {
if bm.tagFamilies == nil {
bm.tagFamilies = make(map[string]*dataBlock)
}
bm.tagFamilies[k] = &dataBlock{}
bm.tagFamilies[k].copyFrom(db)
}
}
func (bm *blockMetadata) getTagFamilyMetadata(name string) *dataBlock {
if bm.tagFamilies == nil {
bm.tagFamilies = make(map[string]*dataBlock)
}
tf, ok := bm.tagFamilies[name]
if !ok {
tf = &dataBlock{}
bm.tagFamilies[name] = tf
}
return tf
}
func (bm *blockMetadata) reset() {
bm.seriesID = 0
bm.uncompressedSizeBytes = 0
bm.count = 0
bm.timestamps.reset()
bm.elementIDs.reset()
for k := range bm.tagFamilies {
bm.tagFamilies[k].reset()
delete(bm.tagFamilies, k)
}
bm.tagProjection = bm.tagProjection[:0]
}
func (bm *blockMetadata) marshal(dst []byte) []byte {
dst = bm.seriesID.AppendToBytes(dst)
dst = encoding.VarUint64ToBytes(dst, bm.uncompressedSizeBytes)
dst = encoding.VarUint64ToBytes(dst, bm.count)
dst = bm.timestamps.marshal(dst)
dst = bm.elementIDs.marshal(dst)
dst = encoding.VarUint64ToBytes(dst, uint64(len(bm.tagFamilies)))
// make sure the order of tagFamilies is stable
keys := make([]string, 0, len(bm.tagFamilies))
for k := range bm.tagFamilies {
keys = append(keys, k)
}
sort.Strings(keys)
for _, name := range keys {
cf := bm.tagFamilies[name]
dst = encoding.EncodeBytes(dst, convert.StringToBytes(name))
dst = cf.marshal(dst)
}
return dst
}
func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) {
if len(src) < 8 {
return nil, errors.New("cannot unmarshal blockMetadata from less than 8 bytes")
}
bm.seriesID = common.SeriesID(encoding.BytesToUint64(src))
src = src[8:]
src, n := encoding.BytesToVarUint64(src)
bm.uncompressedSizeBytes = n
src, n = encoding.BytesToVarUint64(src)
bm.count = n
src = bm.timestamps.unmarshal(src)
src = bm.elementIDs.unmarshal(src)
src, n = encoding.BytesToVarUint64(src)
if n > 0 {
if bm.tagFamilies == nil {
bm.tagFamilies = make(map[string]*dataBlock, n)
}
var nameBytes []byte
var err error
for i := uint64(0); i < n; i++ {
src, nameBytes, err = encoding.DecodeBytes(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err)
}
tf := &dataBlock{}
src = tf.unmarshal(src)
bm.tagFamilies[string(nameBytes)] = tf
}
}
return src, nil
}
func (bm *blockMetadata) less(other *blockMetadata) bool {
if bm.seriesID == other.seriesID {
return bm.timestamps.min < other.timestamps.min
}
return bm.seriesID < other.seriesID
}
func generateBlockMetadata() *blockMetadata {
v := blockMetadataPool.Get()
if v == nil {
return &blockMetadata{}
}
return v
}
func releaseBlockMetadata(bm *blockMetadata) {
bm.reset()
blockMetadataPool.Put(bm)
}
var blockMetadataPool = pool.Register[*blockMetadata]("stream-blockMetadata")
type blockMetadataArray struct {
arr []blockMetadata
}
func (bma *blockMetadataArray) reset() {
for i := range bma.arr {
bma.arr[i].reset()
}
bma.arr = bma.arr[:0]
}
var blockMetadataArrayPool = pool.Register[*blockMetadataArray]("stream-blockMetadataArray")
func generateBlockMetadataArray() *blockMetadataArray {
v := blockMetadataArrayPool.Get()
if v == nil {
return &blockMetadataArray{}
}
return v
}
func releaseBlockMetadataArray(bma *blockMetadataArray) {
bma.reset()
blockMetadataArrayPool.Put(bma)
}
type timestampsMetadata struct {
dataBlock
min int64
max int64
elementIDsOffset uint64
encodeType encoding.EncodeType
}
func (tm *timestampsMetadata) reset() {
tm.dataBlock.reset()
tm.min = 0
tm.max = 0
tm.encodeType = 0
tm.elementIDsOffset = 0
}
func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) {
tm.dataBlock.copyFrom(&src.dataBlock)
tm.min = src.min
tm.max = src.max
tm.encodeType = src.encodeType
tm.elementIDsOffset = src.elementIDsOffset
}
func (tm *timestampsMetadata) marshal(dst []byte) []byte {
dst = tm.dataBlock.marshal(dst)
dst = encoding.Uint64ToBytes(dst, uint64(tm.min))
dst = encoding.Uint64ToBytes(dst, uint64(tm.max))
dst = append(dst, byte(tm.encodeType))
dst = encoding.VarUint64ToBytes(dst, tm.elementIDsOffset)
return dst
}
func (tm *timestampsMetadata) unmarshal(src []byte) []byte {
src = tm.dataBlock.unmarshal(src)
tm.min = int64(encoding.BytesToUint64(src))
src = src[8:]
tm.max = int64(encoding.BytesToUint64(src))
src = src[8:]
tm.encodeType = encoding.EncodeType(src[0])
src = src[1:]
src, n := encoding.BytesToVarUint64(src)
tm.elementIDsOffset = n
return src
}
type elementIDsMetadata struct {
dataBlock
encodeType encoding.EncodeType
}
func (em *elementIDsMetadata) reset() {
em.dataBlock.reset()
em.encodeType = 0
}
func (em *elementIDsMetadata) copyFrom(src *elementIDsMetadata) {
em.dataBlock.copyFrom(&src.dataBlock)
em.encodeType = src.encodeType
}
func (em *elementIDsMetadata) marshal(dst []byte) []byte {
dst = em.dataBlock.marshal(dst)
dst = append(dst, byte(em.encodeType))
return dst
}
func (em *elementIDsMetadata) unmarshal(src []byte) []byte {
src = em.dataBlock.unmarshal(src)
em.encodeType = encoding.EncodeType(src[0])
return src[1:]
}
func unmarshalBlockMetadata(dst []blockMetadata, src []byte) ([]blockMetadata, error) {
dstOrig := dst
var pre *blockMetadata
for len(src) > 0 {
if len(dst) < cap(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, blockMetadata{})
}
bm := &dst[len(dst)-1]
tail, err := bm.unmarshal(src)
if err != nil {
return dstOrig, fmt.Errorf("cannot unmarshal blockMetadata entries: %w", err)
}
src = tail
// Validate the order of blockMetadata during unmarshalling
if pre != nil {
if err := validateBlockMetadataOrder(pre, bm); err != nil {
return dstOrig, err
}
}
pre = bm
}
return dst, nil
}
func validateBlockMetadataOrder(pre, cur *blockMetadata) error {
if cur.seriesID < pre.seriesID {
return fmt.Errorf("unexpected blockMetadata with smaller seriesID=%d after bigger seriesID=%d", cur.seriesID, pre.seriesID)
}
if cur.seriesID != pre.seriesID {
return nil
}
tmCur := cur.timestamps
tmPre := pre.timestamps
if tmCur.min < tmPre.min {
return fmt.Errorf("unexpected blockMetadata with smaller timestamp=%d after bigger timestamp=%d", tmCur.min, tmPre.min)
}
return nil
}