banyand/stream/elements.go (172 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "bytes" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/index" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type tagValue struct { tag string value []byte valueArr [][]byte valueType pbv1.ValueType } func (t *tagValue) reset() { t.tag = "" t.value = nil t.valueArr = nil } func (t *tagValue) size() int { s := len(t.tag) if t.value != nil { s += len(t.value) } if t.valueArr != nil { for i := range t.valueArr { s += len(t.valueArr[i]) } } return s } func (t *tagValue) marshal() []byte { if t.valueArr != nil { var dst []byte for i := range t.valueArr { if t.valueType == pbv1.ValueTypeInt64Arr { dst = append(dst, t.valueArr[i]...) continue } dst = marshalVarArray(dst, t.valueArr[i]) } return dst } return t.value } func generateTagValue() *tagValue { v := tagValuePool.Get() if v == nil { return &tagValue{} } return v } func releaseTagValue(v *tagValue) { v.reset() tagValuePool.Put(v) } var tagValuePool = pool.Register[*tagValue]("stream-tagValue") const ( entityDelimiter = '|' escape = '\\' ) func marshalVarArray(dest, src []byte) []byte { if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, escape) < 0 { dest = append(dest, src...) dest = append(dest, entityDelimiter) return dest } for _, b := range src { if b == entityDelimiter || b == escape { dest = append(dest, escape) } dest = append(dest, b) } dest = append(dest, entityDelimiter) return dest } func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) { if len(src) == 0 { return nil, nil, errors.New("empty entity value") } if src[0] == entityDelimiter { return dest, src[1:], nil } for len(src) > 0 { switch { case src[0] == escape: if len(src) < 2 { return nil, nil, errors.New("invalid escape character") } src = src[1:] dest = append(dest, src[0]) case src[0] == entityDelimiter: return dest, src[1:], nil default: dest = append(dest, src[0]) } src = src[1:] } return nil, nil, errors.New("invalid variable array") } type tagValues struct { tag string values []*tagValue } func (t *tagValues) reset() { t.tag = "" for i := range t.values { releaseTagValue(t.values[i]) } t.values = t.values[:0] } type elements struct { seriesIDs []common.SeriesID timestamps []int64 elementIDs []uint64 tagFamilies [][]tagValues } func (e *elements) reset() { e.seriesIDs = e.seriesIDs[:0] e.timestamps = e.timestamps[:0] e.elementIDs = e.elementIDs[:0] for i := range e.tagFamilies { for j := range e.tagFamilies[i] { e.tagFamilies[i][j].reset() } } e.tagFamilies = e.tagFamilies[:0] } func (e *elements) Len() int { return len(e.seriesIDs) } func (e *elements) Less(i, j int) bool { if e.seriesIDs[i] != e.seriesIDs[j] { return e.seriesIDs[i] < e.seriesIDs[j] } return e.timestamps[i] < e.timestamps[j] } func (e *elements) Swap(i, j int) { e.seriesIDs[i], e.seriesIDs[j] = e.seriesIDs[j], e.seriesIDs[i] e.timestamps[i], e.timestamps[j] = e.timestamps[j], e.timestamps[i] e.elementIDs[i], e.elementIDs[j] = e.elementIDs[j], e.elementIDs[i] e.tagFamilies[i], e.tagFamilies[j] = e.tagFamilies[j], e.tagFamilies[i] } func generateElements() *elements { v := elementsPool.Get() if v == nil { return &elements{} } return v } func releaseElements(e *elements) { e.reset() elementsPool.Put(e) } var elementsPool = pool.Register[*elements]("stream-elements") type elementsInTable struct { timeRange timestamp.TimeRange tsTable *tsTable elements *elements docs index.Documents } type elementsInGroup struct { tsdb storage.TSDB[*tsTable, option] docIDsAdded map[uint64]struct{} docs index.Documents tables []*elementsInTable segments []storage.Segment[*tsTable, option] latestTS int64 }