banyand/measure/block.go (941 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package measure import ( "fmt" "slices" "sort" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type block struct { timestamps []int64 versions []int64 tagFamilies []columnFamily field columnFamily } func (b *block) reset() { b.timestamps = b.timestamps[:0] b.versions = b.versions[:0] tff := b.tagFamilies for i := range tff { tff[i].reset() } b.tagFamilies = tff[:0] b.field.reset() } func (b *block) mustInitFromDataPoints(timestamps []int64, versions []int64, tagFamilies [][]nameValues, fields []nameValues) { b.reset() size := len(timestamps) if size == 0 { return } if size != len(tagFamilies) { logger.Panicf("the number of timestamps %d must match the number of tagFamilies %d", size, len(tagFamilies)) } if size != len(fields) { logger.Panicf("the number of timestamps %d must match the number of fields %d", size, len(fields)) } assertTimestampsSorted(timestamps) b.timestamps = append(b.timestamps, timestamps...) b.versions = append(b.versions, versions...) b.mustInitFromTagsAndFields(tagFamilies, fields) } func assertTimestampsSorted(timestamps []int64) { for i := range timestamps { if i > 0 && timestamps[i-1] > timestamps[i] { logger.Panicf("data points must be sorted by timestamp; got the previous data point with bigger timestamp %d than the current data point with timestamp %d", timestamps[i-1], timestamps[i]) } } } func (b *block) mustInitFromTagsAndFields(tagFamilies [][]nameValues, fields []nameValues) { dataPointsLen := len(tagFamilies) if dataPointsLen == 0 { return } for i, tff := range tagFamilies { b.processTagFamilies(tff, i, dataPointsLen) } for i, f := range fields { columns := b.field.resizeColumns(len(f.values)) for j, t := range f.values { columns[j].name = t.name columns[j].resizeValues(dataPointsLen) columns[j].valueType = t.valueType columns[j].values[i] = t.marshal() } } } func (b *block) processTagFamilies(tff []nameValues, i int, dataPointsLen int) { tagFamilies := b.resizeTagFamilies(len(tff)) for j, tf := range tff { tagFamilies[j].name = tf.name b.processTags(tf, j, i, dataPointsLen) } } func (b *block) processTags(tf nameValues, columnFamilyIdx, i int, dataPointsLen int) { columns := b.tagFamilies[columnFamilyIdx].resizeColumns(len(tf.values)) for j, t := range tf.values { columns[j].name = t.name columns[j].resizeValues(dataPointsLen) columns[j].valueType = t.valueType columns[j].values[i] = t.marshal() } } func (b *block) resizeTagFamilies(tagFamiliesLen int) []columnFamily { tff := b.tagFamilies[:0] if n := tagFamiliesLen - cap(tff); n > 0 { tff = append(tff[:cap(tff)], make([]columnFamily, n)...) } tff = tff[:tagFamiliesLen] b.tagFamilies = tff return tff } func (b *block) Len() int { return len(b.timestamps) } func (b *block) mustWriteTo(sid common.SeriesID, bm *blockMetadata, ww *writers) { b.validate() bm.reset() bm.seriesID = sid bm.uncompressedSizeBytes = b.uncompressedSizeBytes() bm.count = uint64(b.Len()) mustWriteTimestampsTo(&bm.timestamps, b.timestamps, b.versions, &ww.timestampsWriter) for ti := range b.tagFamilies { b.marshalTagFamily(b.tagFamilies[ti], bm, ww) } f := b.field cc := f.columns cmm := bm.field.resizeColumnMetadata(len(cc)) for i := range cc { cc[i].mustWriteTo(&cmm[i], &ww.fieldValuesWriter) } } func (b *block) validate() { timestamps := b.timestamps for i := 1; i < len(timestamps); i++ { if timestamps[i-1] > timestamps[i] { logger.Panicf("log entries must be sorted by timestamp; got the previous entry with bigger timestamp %d than the current entry with timestamp %d", timestamps[i-1], timestamps[i]) } } itemsCount := len(timestamps) tff := b.tagFamilies for _, tf := range tff { for _, c := range tf.columns { if len(c.values) != itemsCount { logger.Panicf("unexpected number of values for tags %q: got %d; want %d", c.name, len(c.values), itemsCount) } } } ff := b.field for _, f := range ff.columns { if len(f.values) != itemsCount { logger.Panicf("unexpected number of values for fields %q: got %d; want %d", f.name, len(f.values), itemsCount) } } } func (b *block) marshalTagFamily(tf columnFamily, bm *blockMetadata, ww *writers) { hw, w := ww.getColumnMetadataWriterAndColumnWriter(tf.name) cc := tf.columns cfm := generateColumnFamilyMetadata() cmm := cfm.resizeColumnMetadata(len(cc)) for i := range cc { cc[i].mustWriteTo(&cmm[i], w) } bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf = cfm.marshal(bb.Buf) releaseColumnFamilyMetadata(cfm) tfm := bm.getTagFamilyMetadata(tf.name) tfm.offset = hw.bytesWritten tfm.size = uint64(len(bb.Buf)) if tfm.size > maxTagFamiliesMetadataSize { logger.Panicf("too big columnFamilyMetadataSize: %d bytes; mustn't exceed %d bytes", tfm.size, maxTagFamiliesMetadataSize) } hw.MustWrite(bb.Buf) } func (b *block) unmarshalTagFamily(decoder *encoding.BytesBlockDecoder, tfIndex int, name string, columnFamilyMetadataBlock *dataBlock, tagProjection []string, metaReader, valueReader fs.Reader, count int, ) { if len(tagProjection) < 1 { return } bb := bigValuePool.Generate() bb.Buf = bytes.ResizeExact(bb.Buf, int(columnFamilyMetadataBlock.size)) fs.MustReadData(metaReader, int64(columnFamilyMetadataBlock.offset), bb.Buf) cfm := generateColumnFamilyMetadata() defer releaseColumnFamilyMetadata(cfm) _, err := cfm.unmarshal(bb.Buf) if err != nil { logger.Panicf("%s: cannot unmarshal columnFamilyMetadata: %v", metaReader.Path(), err) } bigValuePool.Release(bb) b.tagFamilies[tfIndex].name = name cc := b.tagFamilies[tfIndex].resizeColumns(len(tagProjection)) NEXT: for j := range tagProjection { for i := range cfm.columnMetadata { if tagProjection[j] == cfm.columnMetadata[i].name { cc[j].mustReadValues(decoder, valueReader, cfm.columnMetadata[i], uint64(b.Len())) continue NEXT } } cc[j].name = tagProjection[j] cc[j].valueType = pbv1.ValueTypeUnknown cc[j].resizeValues(count) for k := range cc[j].values { cc[j].values[k] = nil } } } func (b *block) unmarshalTagFamilyFromSeqReaders(decoder *encoding.BytesBlockDecoder, tfIndex int, name string, columnFamilyMetadataBlock *dataBlock, metaReader, valueReader *seqReader, ) { if columnFamilyMetadataBlock.offset != metaReader.bytesRead { logger.Panicf("offset %d must be equal to bytesRead %d", columnFamilyMetadataBlock.offset, metaReader.bytesRead) } bb := bigValuePool.Generate() bb.Buf = bytes.ResizeExact(bb.Buf, int(columnFamilyMetadataBlock.size)) metaReader.mustReadFull(bb.Buf) cfm := generateColumnFamilyMetadata() defer releaseColumnFamilyMetadata(cfm) _, err := cfm.unmarshal(bb.Buf) if err != nil { logger.Panicf("%s: cannot unmarshal columnFamilyMetadata: %v", metaReader.Path(), err) } bigValuePool.Release(bb) b.tagFamilies[tfIndex].name = name cc := b.tagFamilies[tfIndex].resizeColumns(len(cfm.columnMetadata)) for i := range cfm.columnMetadata { cc[i].mustSeqReadValues(decoder, valueReader, cfm.columnMetadata[i], uint64(b.Len())) } } func (b *block) uncompressedSizeBytes() uint64 { dataPointsCount := uint64(b.Len()) n := dataPointsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for version tff := b.tagFamilies for i := range tff { tf := tff[i] nameLen := uint64(len(tf.name)) for _, c := range tf.columns { nameLen += uint64(len(c.name)) for _, v := range c.values { if len(v) > 0 { n += nameLen + uint64(len(v)) } } } } ff := b.field for i := range ff.columns { c := ff.columns[i] nameLen := uint64(len(c.name)) for _, v := range c.values { if len(v) > 0 { n += nameLen + uint64(len(v)) } } } return n } func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm blockMetadata) { b.reset() b.timestamps, b.versions = mustReadTimestampsFrom(b.timestamps, b.versions, &bm.timestamps, int(bm.count), p.timestamps) cc := b.field.resizeColumns(len(bm.field.columnMetadata)) for i := range cc { cc[i].mustReadValues(decoder, p.fieldValues, bm.field.columnMetadata[i], bm.count) } _ = b.resizeTagFamilies(len(bm.tagProjection)) for i := range bm.tagProjection { name := bm.tagProjection[i].Family block, ok := bm.tagFamilies[name] if !ok { b.tagFamilies[i].name = name b.tagFamilies[i].resizeColumns(len(bm.tagProjection[i].Names)) for j := range bm.tagProjection[i].Names { b.tagFamilies[i].columns[j].name = bm.tagProjection[i].Names[j] b.tagFamilies[i].columns[j].valueType = pbv1.ValueTypeUnknown b.tagFamilies[i].columns[j].resizeValues(int(bm.count)) for k := range bm.count { b.tagFamilies[i].columns[j].values[k] = nil } } continue } b.unmarshalTagFamily(decoder, i, name, block, bm.tagProjection[i].Names, p.tagFamilyMetadata[name], p.tagFamilies[name], int(bm.count)) } } func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder, seqReaders *seqReaders, bm blockMetadata) { b.reset() b.timestamps, b.versions = mustSeqReadTimestampsFrom(b.timestamps, b.versions, &bm.timestamps, int(bm.count), &seqReaders.timestamps) cc := b.field.resizeColumns(len(bm.field.columnMetadata)) for i := range cc { cc[i].mustSeqReadValues(decoder, &seqReaders.fieldValues, bm.field.columnMetadata[i], bm.count) } _ = b.resizeTagFamilies(len(bm.tagFamilies)) keys := make([]string, 0, len(bm.tagFamilies)) for k := range bm.tagFamilies { keys = append(keys, k) } sort.Strings(keys) for i, name := range keys { block := bm.tagFamilies[name] b.unmarshalTagFamilyFromSeqReaders(decoder, i, name, block, seqReaders.tagFamilyMetadata[name], seqReaders.tagFamilies[name]) } } // For testing purpose only. func (b *block) sortTagFamilies() { sort.Slice(b.tagFamilies, func(i, j int) bool { return b.tagFamilies[i].name < b.tagFamilies[j].name }) } func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps, versions []int64, timestampsWriter *writer) { tm.reset() bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf, tm.encodeType, tm.min = encoding.Int64ListToBytes(bb.Buf[:0], timestamps) tm.encodeType = encoding.GetVersionType(tm.encodeType) if tm.encodeType == encoding.EncodeTypeUnknown { logger.Panicf("unexpected encodeType %d", tm.encodeType) return } tm.max = timestamps[len(timestamps)-1] tm.offset = timestampsWriter.bytesWritten tm.versionOffset = uint64(len(bb.Buf)) timestampsWriter.MustWrite(bb.Buf) bb.Buf, tm.versionEncodeType, tm.versionFirst = encoding.Int64ListToBytes(bb.Buf[:0], versions) tm.size = tm.versionOffset + uint64(len(bb.Buf)) timestampsWriter.MustWrite(bb.Buf) } func mustReadTimestampsFrom(timestamps, versions []int64, tm *timestampsMetadata, count int, reader fs.Reader) ([]int64, []int64) { bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) fs.MustReadData(reader, int64(tm.offset), bb.Buf) return mustDecodeTimestampsWithVersions(timestamps, versions, tm, count, reader.Path(), bb.Buf) } func mustSeqReadTimestampsFrom(timestamps, versions []int64, tm *timestampsMetadata, count int, reader *seqReader) ([]int64, []int64) { if tm.offset != reader.bytesRead { logger.Panicf("offset %d must be equal to bytesRead %d", tm.offset, reader.bytesRead) } bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) reader.mustReadFull(bb.Buf) return mustDecodeTimestampsWithVersions(timestamps, versions, tm, count, reader.Path(), bb.Buf) } func mustDecodeTimestampsWithVersions(timestamps, versions []int64, tm *timestampsMetadata, count int, path string, src []byte) ([]int64, []int64) { var err error t := encoding.GetCommonType(tm.encodeType) if t == encoding.EncodeTypeUnknown { logger.Panicf("unexpected encodeType %d", tm.encodeType) } if tm.size < tm.versionOffset { logger.Panicf("size %d must be greater than versionOffset %d", tm.size, tm.versionOffset) } timestamps, err = encoding.BytesToInt64List(timestamps, src[:tm.versionOffset], t, tm.min, count) if err != nil { logger.Panicf("%s: cannot unmarshal timestamps with versions: %v", path, err) } versions, err = encoding.BytesToInt64List(versions, src[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count) if err != nil { logger.Panicf("%s: cannot unmarshal versions: %v", path, err) } return timestamps, versions } func generateBlock() *block { v := blockPool.Get() if v == nil { return &block{} } return v } func releaseBlock(b *block) { b.reset() blockPool.Put(b) } var blockPool = pool.Register[*block]("measure-block") type blockCursor struct { p *part fields columnFamily timestamps []int64 versions []int64 tagFamilies []columnFamily columnValuesDecoder encoding.BytesBlockDecoder tagProjection []model.TagProjection fieldProjection []string bm blockMetadata idx int minTimestamp int64 maxTimestamp int64 } func (bc *blockCursor) reset() { bc.idx = 0 bc.p = nil bc.bm.reset() bc.minTimestamp = 0 bc.maxTimestamp = 0 bc.tagProjection = bc.tagProjection[:0] bc.fieldProjection = bc.fieldProjection[:0] bc.timestamps = bc.timestamps[:0] bc.versions = bc.versions[:0] tff := bc.tagFamilies for i := range tff { tff[i].reset() } bc.tagFamilies = tff[:0] bc.fields.reset() } func (bc *blockCursor) init(p *part, bm *blockMetadata, queryOpts queryOptions) { bc.reset() bc.p = p bc.bm.copyFrom(bm) bc.minTimestamp = queryOpts.minTimestamp bc.maxTimestamp = queryOpts.maxTimestamp bc.tagProjection = queryOpts.TagProjection bc.fieldProjection = queryOpts.FieldProjection } func (bc *blockCursor) copyAllTo(r *model.MeasureResult, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, tagProjection []model.TagProjection, desc bool, ) { var idx, offset int if desc { idx = 0 offset = bc.idx + 1 } else { idx = bc.idx offset = len(bc.timestamps) } if offset <= idx { return } size := offset - idx r.SID = bc.bm.seriesID r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...) r.Versions = append(r.Versions, bc.versions[idx:offset]...) if desc { slices.Reverse(r.Timestamps) slices.Reverse(r.Versions) } var indexValue map[string]*modelv1.TagValue if storedIndexValue != nil { indexValue = storedIndexValue[r.SID] } OUTER: for _, tp := range tagProjection { tf := model.TagFamily{ Name: tp.Family, } var cf *columnFamily for _, tagName := range tp.Names { t := model.Tag{ Name: tagName, } if indexValue != nil && indexValue[tagName] != nil { t.Values = make([]*modelv1.TagValue, size) for i := 0; i < size; i++ { t.Values[i] = indexValue[tagName] } tf.Tags = append(tf.Tags, t) continue } if cf == nil { for i := range bc.tagFamilies { if bc.tagFamilies[i].name == tp.Family { cf = &bc.tagFamilies[i] break } } } if cf == nil { for _, n := range tp.Names { t = model.Tag{ Name: n, Values: make([]*modelv1.TagValue, size), } for i := 0; i < size; i++ { t.Values[i] = pbv1.NullTagValue } tf.Tags = append(tf.Tags, t) } r.TagFamilies = append(r.TagFamilies, tf) continue OUTER } var foundTag bool for i := range cf.columns { if cf.columns[i].name == tagName { for _, v := range cf.columns[i].values[idx:offset] { t.Values = append(t.Values, mustDecodeTagValue(cf.columns[i].valueType, v)) } foundTag = true break } } if !foundTag { t.Values = make([]*modelv1.TagValue, size) for i := 0; i < size; i++ { t.Values[i] = pbv1.NullTagValue } } else if desc { slices.Reverse(t.Values) } tf.Tags = append(tf.Tags, t) } r.TagFamilies = append(r.TagFamilies, tf) } for _, c := range bc.fields.columns { f := model.Field{ Name: c.name, } for _, v := range c.values[idx:offset] { f.Values = append(f.Values, mustDecodeFieldValue(c.valueType, v)) } if desc { slices.Reverse(f.Values) } r.Fields = append(r.Fields, f) } } func (bc *blockCursor) copyTo(r *model.MeasureResult, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, tagProjection []model.TagProjection, ) { r.SID = bc.bm.seriesID r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx]) r.Versions = append(r.Versions, bc.versions[bc.idx]) var indexValue map[string]*modelv1.TagValue if storedIndexValue != nil { indexValue = storedIndexValue[r.SID] } if len(r.TagFamilies) == 0 { for _, tp := range tagProjection { tf := model.TagFamily{ Name: tp.Family, } for _, n := range tp.Names { t := model.Tag{ Name: n, } tf.Tags = append(tf.Tags, t) } r.TagFamilies = append(r.TagFamilies, tf) } } for i := range r.TagFamilies { tfName := r.TagFamilies[i].Name var cf *columnFamily for j := range r.TagFamilies[i].Tags { tagName := r.TagFamilies[i].Tags[j].Name if indexValue != nil && indexValue[tagName] != nil { r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, indexValue[tagName]) continue } if cf == nil { for i := range bc.tagFamilies { if bc.tagFamilies[i].name == tfName { cf = &bc.tagFamilies[i] break } } } if cf == nil { r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, pbv1.NullTagValue) continue } var foundTag bool for _, c := range cf.columns { if c.name == tagName { r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, mustDecodeTagValue(c.valueType, c.values[bc.idx])) foundTag = true break } } if !foundTag { r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, pbv1.NullTagValue) } } } if len(r.Fields) == 0 { for _, n := range bc.fieldProjection { f := model.Field{ Name: n, } r.Fields = append(r.Fields, f) } } for i, c := range bc.fields.columns { r.Fields[i].Values = append(r.Fields[i].Values, mustDecodeFieldValue(c.valueType, c.values[bc.idx])) } } func (bc *blockCursor) replace(r *model.MeasureResult, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue) { r.SID = bc.bm.seriesID r.Timestamps[len(r.Timestamps)-1] = bc.timestamps[bc.idx] r.Versions[len(r.Versions)-1] = bc.versions[bc.idx] var indexValue map[string]*modelv1.TagValue if storedIndexValue != nil { indexValue = storedIndexValue[r.SID] } for i := range r.TagFamilies { tfName := r.TagFamilies[i].Name var cf *columnFamily for j := range r.TagFamilies[i].Tags { tagName := r.TagFamilies[i].Tags[j].Name if indexValue != nil && indexValue[tagName] != nil { r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = indexValue[tagName] continue } if cf == nil { for i := range bc.tagFamilies { if bc.tagFamilies[i].name == tfName { cf = &bc.tagFamilies[i] break } } } for _, c := range cf.columns { if c.name == tagName { r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = mustDecodeTagValue(c.valueType, c.values[bc.idx]) break } } } } for i, c := range bc.fields.columns { r.Fields[i].Values[len(r.Fields[i].Values)-1] = mustDecodeFieldValue(c.valueType, c.values[bc.idx]) } } func (bc *blockCursor) loadData(tmpBlock *block) bool { tmpBlock.reset() cfm := make([]columnMetadata, 0, len(bc.fieldProjection)) NEXT_FIELD: for _, fp := range bc.fieldProjection { for _, cm := range bc.bm.field.columnMetadata { if cm.name == fp { cfm = append(cfm, cm) continue NEXT_FIELD } } cfm = append(cfm, columnMetadata{ name: fp, valueType: pbv1.ValueTypeUnknown, }) } bc.bm.field.columnMetadata = cfm bc.bm.tagProjection = bc.tagProjection var tf map[string]*dataBlock for _, tp := range bc.tagProjection { for tfName, block := range bc.bm.tagFamilies { if tp.Family == tfName { if tf == nil { tf = make(map[string]*dataBlock, len(bc.tagProjection)) } tf[tfName] = block } } } bc.bm.tagFamilies = tf tmpBlock.mustReadFrom(&bc.columnValuesDecoder, bc.p, bc.bm) start, end, ok := timestamp.FindRange(tmpBlock.timestamps, bc.minTimestamp, bc.maxTimestamp) if !ok { return false } bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[start:end+1]...) bc.versions = append(bc.versions, tmpBlock.versions[start:end+1]...) for _, cf := range tmpBlock.tagFamilies { tf := columnFamily{ name: cf.name, } for i := range cf.columns { column := column{ name: cf.columns[i].name, valueType: cf.columns[i].valueType, } if len(cf.columns[i].values) == 0 { continue } if len(cf.columns[i].values) != len(tmpBlock.timestamps) { logger.Panicf("unexpected number of values for tags %q: got %d; want %d", cf.columns[i].name, len(cf.columns[i].values), len(tmpBlock.timestamps)) } column.values = append(column.values, cf.columns[i].values[start:end+1]...) tf.columns = append(tf.columns, column) } bc.tagFamilies = append(bc.tagFamilies, tf) } bc.fields.name = tmpBlock.field.name for i := range tmpBlock.field.columns { if len(tmpBlock.field.columns[i].values) == 0 { continue } if len(tmpBlock.field.columns[i].values) != len(tmpBlock.timestamps) { logger.Panicf("unexpected number of values for fields %q: got %d; want %d", tmpBlock.field.columns[i].name, len(tmpBlock.field.columns[i].values), len(tmpBlock.timestamps)) } c := column{ name: tmpBlock.field.columns[i].name, valueType: tmpBlock.field.columns[i].valueType, } c.values = append(c.values, tmpBlock.field.columns[i].values[start:end+1]...) bc.fields.columns = append(bc.fields.columns, c) } return true } var blockCursorPool = pool.Register[*blockCursor]("measure-blockCursor") func generateBlockCursor() *blockCursor { v := blockCursorPool.Get() if v == nil { return &blockCursor{} } return v } func releaseBlockCursor(bc *blockCursor) { bc.reset() blockCursorPool.Put(bc) } type blockPointer struct { block bm blockMetadata idx int } func (bi *blockPointer) updateMetadata() { if len(bi.block.timestamps) == 0 { return } // only update timestamps since they are used for merging // blockWriter will recompute all fields bi.bm.timestamps.min = bi.block.timestamps[0] bi.bm.timestamps.max = bi.block.timestamps[len(bi.timestamps)-1] } func (bi *blockPointer) copyFrom(src *blockPointer) { bi.reset() bi.bm.copyFrom(&src.bm) bi.appendAll(src) } func (bi *blockPointer) appendAll(b *blockPointer) { if len(b.timestamps) == 0 { return } bi.append(b, len(b.timestamps)) } var log = logger.GetLogger("measure").Named("block") func (bi *blockPointer) append(b *blockPointer, offset int) { if offset <= b.idx { return } if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 { fullTagAppend(bi, b, offset) } else { if err := fastTagAppend(bi, b, offset); err != nil { if log.Debug().Enabled() { log.Debug().Msgf("fastTagMerge failed: %v; falling back to fullTagMerge", err) } fullTagAppend(bi, b, offset) } } if len(bi.field.columns) == 0 && len(b.field.columns) > 0 { fullFieldAppend(bi, b, offset) } else { if err := fastFieldAppend(bi, b, offset); err != nil { if log.Debug().Enabled() { log.Debug().Msgf("fastFieldAppend failed: %v; falling back to fullFieldAppend", err) } fullFieldAppend(bi, b, offset) } } assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset) bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...) assertIdxAndOffset("versions", len(b.versions), bi.idx, offset) bi.versions = append(bi.versions, b.versions[b.idx:offset]...) } func fastTagAppend(bi, b *blockPointer, offset int) error { if len(bi.tagFamilies) != len(b.tagFamilies) { return fmt.Errorf("unexpected number of tag families: got %d; want %d", len(b.tagFamilies), len(bi.tagFamilies)) } for i := range bi.tagFamilies { if bi.tagFamilies[i].name != b.tagFamilies[i].name { return fmt.Errorf("unexpected tag family name: got %q; want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name) } if len(bi.tagFamilies[i].columns) != len(b.tagFamilies[i].columns) { return fmt.Errorf("unexpected number of tags for tag family %q: got %d; want %d", bi.tagFamilies[i].name, len(b.tagFamilies[i].columns), len(bi.tagFamilies[i].columns)) } for j := range bi.tagFamilies[i].columns { if bi.tagFamilies[i].columns[j].name != b.tagFamilies[i].columns[j].name { return fmt.Errorf("unexpected tag name for tag family %q: got %q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].columns[j].name, bi.tagFamilies[i].columns[j].name) } assertIdxAndOffset(b.tagFamilies[i].columns[j].name, len(b.tagFamilies[i].columns[j].values), b.idx, offset) bi.tagFamilies[i].columns[j].values = append(bi.tagFamilies[i].columns[j].values, b.tagFamilies[i].columns[j].values[b.idx:offset]...) } } return nil } func fullTagAppend(bi, b *blockPointer, offset int) { existDataSize := len(bi.timestamps) appendTagFamilies := func(tf columnFamily) { tagFamily := columnFamily{name: tf.name} for i := range tf.columns { assertIdxAndOffset(tf.columns[i].name, len(tf.columns[i].values), b.idx, offset) col := column{name: tf.columns[i].name, valueType: tf.columns[i].valueType} for j := 0; j < existDataSize; j++ { col.values = append(col.values, nil) } col.values = append(col.values, tf.columns[i].values[b.idx:offset]...) tagFamily.columns = append(tagFamily.columns, col) } bi.tagFamilies = append(bi.tagFamilies, tagFamily) } if len(bi.tagFamilies) == 0 { for _, tf := range b.tagFamilies { appendTagFamilies(tf) } return } tagFamilyMap := make(map[string]*columnFamily) for i := range bi.tagFamilies { tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i] } for _, tf := range b.tagFamilies { if existingTagFamily, exists := tagFamilyMap[tf.name]; exists { columnMap := make(map[string]*column) for i := range existingTagFamily.columns { columnMap[existingTagFamily.columns[i].name] = &existingTagFamily.columns[i] } for _, c := range tf.columns { if existingColumn, exists := columnMap[c.name]; exists { assertIdxAndOffset(c.name, len(c.values), b.idx, offset) existingColumn.values = append(existingColumn.values, c.values[b.idx:offset]...) } else { assertIdxAndOffset(c.name, len(c.values), b.idx, offset) col := column{name: c.name, valueType: c.valueType} for j := 0; j < existDataSize; j++ { col.values = append(col.values, nil) } col.values = append(col.values, c.values[b.idx:offset]...) existingTagFamily.columns = append(existingTagFamily.columns, col) } } } else { appendTagFamilies(tf) } } for k := range tagFamilyMap { delete(tagFamilyMap, k) } for i := range b.tagFamilies { tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i] } emptySize := offset - b.idx for _, tf := range bi.tagFamilies { if _, exists := tagFamilyMap[tf.name]; !exists { for i := range tf.columns { for j := 0; j < emptySize; j++ { tf.columns[i].values = append(tf.columns[i].values, nil) } } } else { existingTagFamily := tagFamilyMap[tf.name] columnMap := make(map[string]*column) for i := range existingTagFamily.columns { columnMap[existingTagFamily.columns[i].name] = &existingTagFamily.columns[i] } for i := range tf.columns { if _, exists := columnMap[tf.columns[i].name]; !exists { for j := 0; j < emptySize; j++ { tf.columns[i].values = append(tf.columns[i].values, nil) } } } } } } func fastFieldAppend(bi, b *blockPointer, offset int) error { if len(bi.field.columns) != len(b.field.columns) { return fmt.Errorf("unexpected number of fields: got %d; want %d", len(bi.field.columns), len(b.field.columns)) } for i := range bi.field.columns { if bi.field.columns[i].name != b.field.columns[i].name { return fmt.Errorf("unexpected field name: got %q; want %q", b.field.columns[i].name, bi.field.columns[i].name) } assertIdxAndOffset(b.field.columns[i].name, len(b.field.columns[i].values), b.idx, offset) bi.field.columns[i].values = append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...) } return nil } func fullFieldAppend(bi, b *blockPointer, offset int) { existDataSize := len(bi.timestamps) appendFields := func(c column) { col := column{name: c.name, valueType: c.valueType} for j := 0; j < existDataSize; j++ { col.values = append(col.values, nil) } col.values = append(col.values, c.values[b.idx:offset]...) bi.field.columns = append(bi.field.columns, col) } if len(bi.field.columns) == 0 { for _, c := range b.field.columns { appendFields(c) } return } fieldMap := make(map[string]*column) for i := range bi.field.columns { fieldMap[bi.field.columns[i].name] = &bi.field.columns[i] } for _, c := range b.field.columns { if existingField, exists := fieldMap[c.name]; exists { assertIdxAndOffset(c.name, len(c.values), b.idx, offset) existingField.values = append(existingField.values, c.values[b.idx:offset]...) } else { appendFields(c) } } for k := range fieldMap { delete(fieldMap, k) } for i := range b.field.columns { fieldMap[b.field.columns[i].name] = &b.field.columns[i] } emptySize := offset - b.idx for i := range bi.field.columns { if _, exists := fieldMap[bi.field.columns[i].name]; !exists { for j := 0; j < emptySize; j++ { bi.field.columns[i].values = append(bi.field.columns[i].values, nil) } } } } func assertIdxAndOffset(name string, length int, idx int, offset int) { if idx >= offset { logger.Panicf("%q idx %d must be less than offset %d", name, idx, offset) } if offset > length { logger.Panicf("%q offset %d must be less than or equal to length %d", name, offset, length) } } func (bi *blockPointer) isFull() bool { return bi.bm.count >= maxBlockLength || bi.bm.uncompressedSizeBytes >= maxUncompressedBlockSize } func (bi *blockPointer) reset() { bi.idx = 0 bi.block.reset() bi.bm.reset() } func generateBlockPointer() *blockPointer { v := blockPointerPool.Get() if v == nil { return &blockPointer{} } return v } func releaseBlockPointer(bi *blockPointer) { bi.reset() blockPointerPool.Put(bi) } var blockPointerPool = pool.Register[*blockPointer]("measure-blockPointer")