banyand/stream/block.go (690 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "fmt" "sort" "golang.org/x/exp/slices" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type block struct { timestamps []int64 elementIDs []uint64 tagFamilies []tagFamily } func (b *block) reset() { b.timestamps = b.timestamps[:0] b.elementIDs = b.elementIDs[:0] tff := b.tagFamilies for i := range tff { tff[i].reset() } b.tagFamilies = tff[:0] } func (b *block) mustInitFromElements(timestamps []int64, elementIDs []uint64, tagFamilies [][]tagValues) { b.reset() size := len(timestamps) if size == 0 { return } if size != len(tagFamilies) { logger.Panicf("the number of timestamps %d must match the number of tagFamilies %d", size, len(tagFamilies)) } assertTimestampsSorted(timestamps) b.timestamps = append(b.timestamps, timestamps...) b.elementIDs = append(b.elementIDs, elementIDs...) b.mustInitFromTags(tagFamilies) } func assertTimestampsSorted(timestamps []int64) { for i := range timestamps { if i > 0 && timestamps[i-1] > timestamps[i] { logger.Panicf("elements must be sorted by timestamp; got the previous element with bigger timestamp %d than the current element with timestamp %d", timestamps[i-1], timestamps[i]) } } } func (b *block) mustInitFromTags(tagFamilies [][]tagValues) { elementsLen := len(tagFamilies) if elementsLen == 0 { return } for i, tff := range tagFamilies { b.processTagFamilies(tff, i, elementsLen) } } func (b *block) processTagFamilies(tff []tagValues, i int, elementsLen int) { tagFamilies := b.resizeTagFamilies(len(tff)) for j, tf := range tff { tagFamilies[j].name = tf.tag b.processTags(tf, j, i, elementsLen) } } func (b *block) processTags(tf tagValues, tagFamilyIdx, i int, elementsLen int) { tags := b.tagFamilies[tagFamilyIdx].resizeTags(len(tf.values)) for j, t := range tf.values { tags[j].name = t.tag tags[j].resizeValues(elementsLen) tags[j].valueType = t.valueType tags[j].values[i] = t.marshal() } } func (b *block) resizeTagFamilies(tagFamiliesLen int) []tagFamily { tff := b.tagFamilies[:0] if n := tagFamiliesLen - cap(tff); n > 0 { tff = append(tff[:cap(tff)], make([]tagFamily, n)...) } tff = tff[:tagFamiliesLen] b.tagFamilies = tff return tff } func (b *block) Len() int { return len(b.timestamps) } func (b *block) mustWriteTo(sid common.SeriesID, bm *blockMetadata, ww *writers) { b.validate() bm.reset() bm.seriesID = sid bm.uncompressedSizeBytes = b.uncompressedSizeBytes() bm.count = uint64(b.Len()) mustWriteTimestampsTo(&bm.timestamps, b.timestamps, b.elementIDs, &ww.timestampsWriter) for ti := range b.tagFamilies { b.marshalTagFamily(b.tagFamilies[ti], bm, ww) } } func (b *block) validate() { timestamps := b.timestamps for i := 1; i < len(timestamps); i++ { if timestamps[i-1] > timestamps[i] { logger.Panicf("log entries must be sorted by timestamp; got the previous entry with bigger timestamp %d than the current entry with timestamp %d", timestamps[i-1], timestamps[i]) } } itemsCount := len(timestamps) if itemsCount != len(b.elementIDs) { logger.Panicf("unexpected number of values for elementIDs: got %d; want %d", len(b.elementIDs), itemsCount) } tff := b.tagFamilies for _, tf := range tff { for _, c := range tf.tags { if len(c.values) != itemsCount { logger.Panicf("unexpected number of values for tags %q: got %d; want %d", c.name, len(c.values), itemsCount) } } } } func (b *block) marshalTagFamily(tf tagFamily, bm *blockMetadata, ww *writers) { hw, w := ww.getTagMetadataWriterAndTagWriter(tf.name) cc := tf.tags cfm := generateTagFamilyMetadata() cmm := cfm.resizeTagMetadata(len(cc)) for i := range cc { cc[i].mustWriteTo(&cmm[i], w) } bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf = cfm.marshal(bb.Buf) releaseTagFamilyMetadata(cfm) tfm := bm.getTagFamilyMetadata(tf.name) tfm.offset = hw.bytesWritten tfm.size = uint64(len(bb.Buf)) if tfm.size > maxTagFamiliesMetadataSize { logger.Panicf("too big tagFamilyMetadataSize: %d bytes; mustn't exceed %d bytes", tfm.size, maxTagFamiliesMetadataSize) } hw.MustWrite(bb.Buf) } func (b *block) unmarshalTagFamily(decoder *encoding.BytesBlockDecoder, tfIndex int, name string, tagFamilyMetadataBlock *dataBlock, tagProjection []string, metaReader, valueReader fs.Reader, count int, ) { if len(tagProjection) < 1 { return } bb := bigValuePool.Generate() bb.Buf = bytes.ResizeExact(bb.Buf, int(tagFamilyMetadataBlock.size)) fs.MustReadData(metaReader, int64(tagFamilyMetadataBlock.offset), bb.Buf) tfm := generateTagFamilyMetadata() defer releaseTagFamilyMetadata(tfm) err := tfm.unmarshal(bb.Buf) if err != nil { logger.Panicf("%s: cannot unmarshal tagFamilyMetadata: %v", metaReader.Path(), err) } bigValuePool.Release(bb) b.tagFamilies[tfIndex].name = name cc := b.tagFamilies[tfIndex].resizeTags(len(tagProjection)) NEXT: for j := range tagProjection { for i := range tfm.tagMetadata { if tagProjection[j] == tfm.tagMetadata[i].name { cc[j].mustReadValues(decoder, valueReader, tfm.tagMetadata[i], uint64(b.Len())) continue NEXT } cc[j].name = tagProjection[j] cc[j].valueType = pbv1.ValueTypeUnknown cc[j].resizeValues(count) for k := range cc[j].values { cc[j].values[k] = nil } } } } func (b *block) unmarshalTagFamilyFromSeqReaders(decoder *encoding.BytesBlockDecoder, tfIndex int, name string, columnFamilyMetadataBlock *dataBlock, metaReader, valueReader *seqReader, ) { if columnFamilyMetadataBlock.offset != metaReader.bytesRead { logger.Panicf("offset %d must be equal to bytesRead %d", columnFamilyMetadataBlock.offset, metaReader.bytesRead) } bb := bigValuePool.Generate() bb.Buf = bytes.ResizeExact(bb.Buf, int(columnFamilyMetadataBlock.size)) metaReader.mustReadFull(bb.Buf) tfm := generateTagFamilyMetadata() defer releaseTagFamilyMetadata(tfm) err := tfm.unmarshal(bb.Buf) if err != nil { logger.Panicf("%s: cannot unmarshal columnFamilyMetadata: %v", metaReader.Path(), err) } bigValuePool.Release(bb) b.tagFamilies[tfIndex].name = name cc := b.tagFamilies[tfIndex].resizeTags(len(tfm.tagMetadata)) for i := range tfm.tagMetadata { cc[i].mustSeqReadValues(decoder, valueReader, tfm.tagMetadata[i], uint64(b.Len())) } } func (b *block) uncompressedSizeBytes() uint64 { elementsCount := uint64(b.Len()) n := elementsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for elementID tff := b.tagFamilies for i := range tff { tf := tff[i] nameLen := uint64(len(tf.name)) for _, c := range tf.tags { nameLen += uint64(len(c.name)) for _, v := range c.values { if len(v) > 0 { n += nameLen + uint64(len(v)) } } } } return n } func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm blockMetadata) { b.reset() b.timestamps, b.elementIDs = mustReadTimestampsFrom(b.timestamps, b.elementIDs, &bm.timestamps, int(bm.count), p.timestamps) _ = b.resizeTagFamilies(len(bm.tagProjection)) for i := range bm.tagProjection { name := bm.tagProjection[i].Family block, ok := bm.tagFamilies[name] if !ok { b.tagFamilies[i].name = name b.tagFamilies[i].resizeTags(len(bm.tagProjection[i].Names)) for j := range bm.tagProjection[i].Names { b.tagFamilies[i].tags[j].name = bm.tagProjection[i].Names[j] b.tagFamilies[i].tags[j].valueType = pbv1.ValueTypeUnknown b.tagFamilies[i].tags[j].resizeValues(int(bm.count)) for k := range bm.count { b.tagFamilies[i].tags[j].values[k] = nil } } continue } b.unmarshalTagFamily(decoder, i, name, block, bm.tagProjection[i].Names, p.tagFamilyMetadata[name], p.tagFamilies[name], int(bm.count)) } } func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder, seqReaders *seqReaders, bm blockMetadata) { b.reset() b.timestamps, b.elementIDs = mustSeqReadTimestampsFrom(b.timestamps, b.elementIDs, &bm.timestamps, int(bm.count), &seqReaders.timestamps) _ = b.resizeTagFamilies(len(bm.tagFamilies)) keys := make([]string, 0, len(bm.tagFamilies)) for k := range bm.tagFamilies { keys = append(keys, k) } sort.Strings(keys) for i, name := range keys { block := bm.tagFamilies[name] b.unmarshalTagFamilyFromSeqReaders(decoder, i, name, block, seqReaders.tagFamilyMetadata[name], seqReaders.tagFamilies[name]) } } // For testing purpose only. func (b *block) sortTagFamilies() { sort.Slice(b.tagFamilies, func(i, j int) bool { return b.tagFamilies[i].name < b.tagFamilies[j].name }) } func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64, elementIDs []uint64, timestampsWriter *writer) { tm.reset() bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf, tm.encodeType, tm.min = encoding.Int64ListToBytes(bb.Buf[:0], timestamps) tm.max = timestamps[len(timestamps)-1] tm.offset = timestampsWriter.bytesWritten tm.elementIDsOffset = uint64(len(bb.Buf)) timestampsWriter.MustWrite(bb.Buf) bb.Buf = encoding.VarUint64sToBytes(bb.Buf[:0], elementIDs) tm.size = tm.elementIDsOffset + uint64(len(bb.Buf)) timestampsWriter.MustWrite(bb.Buf) } func mustReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, reader fs.Reader) ([]int64, []uint64) { bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) fs.MustReadData(reader, int64(tm.offset), bb.Buf) return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, count, reader.Path(), bb.Buf) } func mustDecodeTimestampsWithVersions(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, path string, src []byte) ([]int64, []uint64) { if tm.size < tm.elementIDsOffset { logger.Panicf("size %d must be greater than elementIDsOffset %d", tm.size, tm.elementIDsOffset) } var err error timestamps, err = encoding.BytesToInt64List(timestamps, src[:tm.elementIDsOffset], tm.encodeType, tm.min, count) if err != nil { logger.Panicf("%s: cannot unmarshal timestamps: %v", path, err) } elementIDs = encoding.ExtendListCapacity(elementIDs, count) elementIDs = elementIDs[:count] _, err = encoding.BytesToVarUint64s(elementIDs, src[tm.elementIDsOffset:]) if err != nil { logger.Panicf("%s: cannot unmarshal element ids: %v", path, err) } return timestamps, elementIDs } func mustSeqReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, reader *seqReader) ([]int64, []uint64) { if tm.offset != reader.bytesRead { logger.Panicf("offset %d must be equal to bytesRead %d", tm.offset, reader.bytesRead) } bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) reader.mustReadFull(bb.Buf) return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, count, reader.Path(), bb.Buf) } func generateBlock() *block { v := blockPool.Get() if v == nil { return &block{} } return v } func releaseBlock(b *block) { b.reset() blockPool.Put(b) } var blockPool = pool.Register[*block]("stream-block") type blockCursor struct { p *part timestamps []int64 elementFilter posting.List elementIDs []uint64 tagFamilies []tagFamily tagValuesDecoder encoding.BytesBlockDecoder tagProjection []model.TagProjection bm blockMetadata idx int minTimestamp int64 maxTimestamp int64 } func (bc *blockCursor) reset() { bc.idx = 0 bc.p = nil bc.bm.reset() bc.minTimestamp = 0 bc.maxTimestamp = 0 bc.tagProjection = bc.tagProjection[:0] bc.timestamps = bc.timestamps[:0] bc.elementIDs = bc.elementIDs[:0] tff := bc.tagFamilies for i := range tff { tff[i].reset() } bc.tagFamilies = tff[:0] } func (bc *blockCursor) init(p *part, bm *blockMetadata, opts queryOptions) { bc.reset() bc.p = p bc.bm.copyFrom(bm) bc.minTimestamp = opts.minTimestamp bc.maxTimestamp = opts.maxTimestamp bc.tagProjection = opts.TagProjection bc.elementFilter = opts.elementFilter } func (bc *blockCursor) copyAllTo(r *model.StreamResult, desc bool) { start, end := 0, bc.idx+1 if !desc { start, end = bc.idx, len(bc.timestamps) } if end <= start { return } r.Timestamps = append(r.Timestamps, bc.timestamps[start:end]...) r.ElementIDs = append(r.ElementIDs, bc.elementIDs[start:end]...) requiredCapacity := end - start r.SIDs = append(r.SIDs, make([]common.SeriesID, requiredCapacity)...) for i := range r.SIDs[len(r.SIDs)-requiredCapacity:] { r.SIDs[len(r.SIDs)-requiredCapacity+i] = bc.bm.seriesID } if desc { slices.Reverse(r.Timestamps) slices.Reverse(r.ElementIDs) } if len(r.TagFamilies) != len(bc.tagProjection) { r.TagFamilies = make([]model.TagFamily, len(bc.tagProjection)) for i, tp := range bc.tagProjection { r.TagFamilies[i] = model.TagFamily{Name: tp.Family, Tags: make([]model.Tag, len(tp.Names))} for j, n := range tp.Names { r.TagFamilies[i].Tags[j] = model.Tag{Name: n} } } } for i, cf := range bc.tagFamilies { for j, c := range cf.tags { values := make([]*modelv1.TagValue, end-start) for k := start; k < end; k++ { if len(c.values) > k { values[k-start] = mustDecodeTagValue(c.valueType, c.values[k]) } else { values[k-start] = pbv1.NullTagValue } } if desc { slices.Reverse(values) } r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, values...) } } } func (bc *blockCursor) copyTo(r *model.StreamResult) { r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx]) r.ElementIDs = append(r.ElementIDs, bc.elementIDs[bc.idx]) r.SIDs = append(r.SIDs, bc.bm.seriesID) if len(r.TagFamilies) != len(bc.tagProjection) { for _, tp := range bc.tagProjection { tf := model.TagFamily{ Name: tp.Family, } for _, n := range tp.Names { t := model.Tag{ Name: n, } tf.Tags = append(tf.Tags, t) } r.TagFamilies = append(r.TagFamilies, tf) } } if len(bc.tagFamilies) != len(r.TagFamilies) { logger.Panicf("unexpected number of tag families: got %d; want %d", len(bc.tagFamilies), len(r.TagFamilies)) } for i, cf := range bc.tagFamilies { if len(r.TagFamilies[i].Tags) != len(cf.tags) { logger.Panicf("unexpected number of tags: got %d; want %d", len(r.TagFamilies[i].Tags), len(bc.tagProjection[i].Names)) } for i2, c := range cf.tags { if len(c.values) > bc.idx { r.TagFamilies[i].Tags[i2].Values = append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.valueType, c.values[bc.idx])) } else { r.TagFamilies[i].Tags[i2].Values = append(r.TagFamilies[i].Tags[i2].Values, pbv1.NullTagValue) } } } } func (bc *blockCursor) loadData(tmpBlock *block) bool { tmpBlock.reset() bc.bm.tagProjection = bc.tagProjection var tf map[string]*dataBlock for _, tp := range bc.tagProjection { for tfName, block := range bc.bm.tagFamilies { if tp.Family == tfName { if tf == nil { tf = make(map[string]*dataBlock, len(bc.tagProjection)) } tf[tfName] = block } } } if len(tf) == 0 { return false } bc.bm.tagFamilies = tf tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm) if len(tmpBlock.timestamps) == 0 { return false } idxList := make([]int, 0) var start, end int if bc.elementFilter != nil { for i := range tmpBlock.elementIDs { if bc.elementFilter.Contains(tmpBlock.elementIDs[i]) { idxList = append(idxList, i) bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[i]) bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[i]) } } if len(bc.timestamps) == 0 { return false } } else { s, e, ok := timestamp.FindRange(tmpBlock.timestamps, bc.minTimestamp, bc.maxTimestamp) start, end = s, e if !ok { return false } bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[s:e+1]...) bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[s:e+1]...) } for i, projection := range bc.bm.tagProjection { tf := tagFamily{ name: projection.Family, } for j, name := range projection.Names { t := tag{ name: name, } t.valueType = tmpBlock.tagFamilies[i].tags[j].valueType if len(tmpBlock.tagFamilies[i].tags[j].values) != len(tmpBlock.timestamps) { logger.Panicf("unexpected number of values for tags %q: got %d; want %d", tmpBlock.tagFamilies[i].tags[j].name, len(tmpBlock.tagFamilies[i].tags[j].values), len(tmpBlock.timestamps)) } if len(idxList) > 0 { for _, idx := range idxList { t.values = append(t.values, tmpBlock.tagFamilies[i].tags[j].values[idx]) } } else { t.values = append(t.values, tmpBlock.tagFamilies[i].tags[j].values[start:end+1]...) } tf.tags = append(tf.tags, t) } bc.tagFamilies = append(bc.tagFamilies, tf) } return len(bc.timestamps) > 0 } var blockCursorPool = pool.Register[*blockCursor]("stream-blockCursor") func generateBlockCursor() *blockCursor { v := blockCursorPool.Get() if v == nil { return &blockCursor{} } return v } func releaseBlockCursor(bc *blockCursor) { bc.reset() blockCursorPool.Put(bc) } type blockPointer struct { block bm blockMetadata idx int } func (bi *blockPointer) updateMetadata() { if len(bi.block.timestamps) == 0 { return } // only update timestamps since they are used for merging // blockWriter will recompute all fields bi.bm.timestamps.min = bi.block.timestamps[0] bi.bm.timestamps.max = bi.block.timestamps[len(bi.timestamps)-1] } func (bi *blockPointer) copyFrom(src *blockPointer) { bi.idx = 0 bi.bm.copyFrom(&src.bm) bi.appendAll(src) } func (bi *blockPointer) appendAll(b *blockPointer) { if len(b.timestamps) == 0 { return } bi.append(b, len(b.timestamps)) } var log = logger.GetLogger("stream").Named("block") func (bi *blockPointer) append(b *blockPointer, offset int) { if offset <= b.idx { return } if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 { fullTagAppend(bi, b, offset) } else { if err := fastTagAppend(bi, b, offset); err != nil { if log.Debug().Enabled() { log.Debug().Msgf("fastTagMerge failed: %v; falling back to fullTagMerge", err) } fullTagAppend(bi, b, offset) } } assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset) bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...) bi.elementIDs = append(bi.elementIDs, b.elementIDs[b.idx:offset]...) } func fastTagAppend(bi, b *blockPointer, offset int) error { if len(bi.tagFamilies) != len(b.tagFamilies) { return fmt.Errorf("unexpected number of tag families: got %d; want %d", len(b.tagFamilies), len(bi.tagFamilies)) } for i := range bi.tagFamilies { if bi.tagFamilies[i].name != b.tagFamilies[i].name { return fmt.Errorf("unexpected tag family name: got %q; want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name) } if len(bi.tagFamilies[i].tags) != len(b.tagFamilies[i].tags) { return fmt.Errorf("unexpected number of tags for tag family %q: got %d; want %d", bi.tagFamilies[i].name, len(b.tagFamilies[i].tags), len(bi.tagFamilies[i].tags)) } for j := range bi.tagFamilies[i].tags { if bi.tagFamilies[i].tags[j].name != b.tagFamilies[i].tags[j].name { return fmt.Errorf("unexpected tag name for tag family %q: got %q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].tags[j].name, bi.tagFamilies[i].tags[j].name) } assertIdxAndOffset(b.tagFamilies[i].tags[j].name, len(b.tagFamilies[i].tags[j].values), b.idx, offset) bi.tagFamilies[i].tags[j].values = append(bi.tagFamilies[i].tags[j].values, b.tagFamilies[i].tags[j].values[b.idx:offset]...) } } return nil } func fullTagAppend(bi, b *blockPointer, offset int) { existDataSize := len(bi.timestamps) appendTagFamilies := func(tf tagFamily) { tfv := tagFamily{name: tf.name} for i := range tf.tags { assertIdxAndOffset(tf.tags[i].name, len(tf.tags[i].values), b.idx, offset) col := tag{name: tf.tags[i].name, valueType: tf.tags[i].valueType} for j := 0; j < existDataSize; j++ { col.values = append(col.values, nil) } col.values = append(col.values, tf.tags[i].values[b.idx:offset]...) tfv.tags = append(tfv.tags, col) } bi.tagFamilies = append(bi.tagFamilies, tfv) } if len(bi.tagFamilies) == 0 { for _, tf := range b.tagFamilies { appendTagFamilies(tf) } return } tagFamilyMap := make(map[string]*tagFamily) for i := range bi.tagFamilies { tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i] } for _, tf := range b.tagFamilies { if existingTagFamily, exists := tagFamilyMap[tf.name]; exists { columnMap := make(map[string]*tag) for i := range existingTagFamily.tags { columnMap[existingTagFamily.tags[i].name] = &existingTagFamily.tags[i] } for _, c := range tf.tags { if existingColumn, exists := columnMap[c.name]; exists { assertIdxAndOffset(c.name, len(c.values), b.idx, offset) existingColumn.values = append(existingColumn.values, c.values[b.idx:offset]...) } else { assertIdxAndOffset(c.name, len(c.values), b.idx, offset) col := tag{name: c.name, valueType: c.valueType} for j := 0; j < existDataSize; j++ { col.values = append(col.values, nil) } col.values = append(col.values, c.values[b.idx:offset]...) existingTagFamily.tags = append(existingTagFamily.tags, col) } } } else { appendTagFamilies(tf) } } for k := range tagFamilyMap { delete(tagFamilyMap, k) } for i := range b.tagFamilies { tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i] } emptySize := offset - b.idx for _, tf := range bi.tagFamilies { if _, exists := tagFamilyMap[tf.name]; !exists { for i := range tf.tags { for j := 0; j < emptySize; j++ { tf.tags[i].values = append(tf.tags[i].values, nil) } } } else { existingTagFamily := tagFamilyMap[tf.name] columnMap := make(map[string]*tag) for i := range existingTagFamily.tags { columnMap[existingTagFamily.tags[i].name] = &existingTagFamily.tags[i] } for i := range tf.tags { if _, exists := columnMap[tf.tags[i].name]; !exists { for j := 0; j < emptySize; j++ { tf.tags[i].values = append(tf.tags[i].values, nil) } } } } } } func assertIdxAndOffset(name string, length int, idx int, offset int) { if idx >= offset { logger.Panicf("%q idx %d must be less than offset %d", name, idx, offset) } if offset > length { logger.Panicf("%q offset %d must be less than or equal to length %d", name, offset, length) } } func (bi *blockPointer) isFull() bool { return bi.bm.uncompressedSizeBytes >= maxUncompressedBlockSize } func (bi *blockPointer) reset() { bi.idx = 0 bi.block.reset() bi.bm = blockMetadata{} } func generateBlockPointer() *blockPointer { v := blockPointerPool.Get() if v == nil { return &blockPointer{} } return v } func releaseBlockPointer(bi *blockPointer) { bi.reset() blockPointerPool.Put(bi) } var blockPointerPool = pool.Register[*blockPointer]("stream-blockPointer")