banyand/measure/datapoints.go (194 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"bytes"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/index"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type nameValue struct {
name string
value []byte
valueArr [][]byte
valueType pbv1.ValueType
}
func (n *nameValue) reset() {
n.name = ""
n.value = nil
n.valueArr = nil
}
func generateNameValue() *nameValue {
v := nameValuePool.Get()
if v == nil {
return &nameValue{}
}
return v
}
func releaseNameValue(v *nameValue) {
v.reset()
nameValuePool.Put(v)
}
var nameValuePool = pool.Register[*nameValue]("measure-nameValue")
func (n *nameValue) size() int {
s := len(n.name)
if n.value != nil {
s += len(n.value)
}
if n.valueArr != nil {
for i := range n.valueArr {
s += len(n.valueArr[i])
}
}
return s
}
func (n *nameValue) marshal() []byte {
if n.valueArr != nil {
var dst []byte
for i := range n.valueArr {
if n.valueType == pbv1.ValueTypeInt64Arr {
dst = append(dst, n.valueArr[i]...)
continue
}
dst = marshalVarArray(dst, n.valueArr[i])
}
return dst
}
return n.value
}
const (
entityDelimiter = '|'
escape = '\\'
)
func marshalVarArray(dest, src []byte) []byte {
if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, escape) < 0 {
dest = append(dest, src...)
dest = append(dest, entityDelimiter)
return dest
}
for _, b := range src {
if b == entityDelimiter || b == escape {
dest = append(dest, escape)
}
dest = append(dest, b)
}
dest = append(dest, entityDelimiter)
return dest
}
func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) {
if len(src) == 0 {
return nil, nil, errors.New("empty entity value")
}
if src[0] == entityDelimiter {
return dest, src[1:], nil
}
for len(src) > 0 {
switch {
case src[0] == escape:
if len(src) < 2 {
return nil, nil, errors.New("invalid escape character")
}
src = src[1:]
dest = append(dest, src[0])
case src[0] == entityDelimiter:
return dest, src[1:], nil
default:
dest = append(dest, src[0])
}
src = src[1:]
}
return nil, nil, errors.New("invalid variable array")
}
type nameValues struct {
name string
values []*nameValue
}
func (n *nameValues) reset() {
n.name = ""
for i := range n.values {
releaseNameValue(n.values[i])
}
n.values = n.values[:0]
}
type dataPoints struct {
seriesIDs []common.SeriesID
timestamps []int64
versions []int64
tagFamilies [][]nameValues
fields []nameValues
}
func (d *dataPoints) reset() {
d.seriesIDs = d.seriesIDs[:0]
d.timestamps = d.timestamps[:0]
d.versions = d.versions[:0]
for i := range d.tagFamilies {
for j := range d.tagFamilies[i] {
d.tagFamilies[i][j].reset()
}
}
d.tagFamilies = d.tagFamilies[:0]
for i := range d.fields {
d.fields[i].reset()
}
d.fields = d.fields[:0]
}
func (d *dataPoints) skip(i int) {
if len(d.timestamps) <= i {
return
}
d.seriesIDs = append(d.seriesIDs[:i], d.seriesIDs[i+1:]...)
d.timestamps = append(d.timestamps[:i], d.timestamps[i+1:]...)
d.versions = append(d.versions[:i], d.versions[i+1:]...)
d.tagFamilies = append(d.tagFamilies[:i], d.tagFamilies[i+1:]...)
if len(d.fields) > 0 {
d.fields = append(d.fields[:i], d.fields[i+1:]...)
}
}
func (d *dataPoints) Len() int {
return len(d.seriesIDs)
}
func (d *dataPoints) Less(i, j int) bool {
if d.seriesIDs[i] != d.seriesIDs[j] {
return d.seriesIDs[i] < d.seriesIDs[j]
}
if d.timestamps[i] != d.timestamps[j] {
return d.timestamps[i] < d.timestamps[j]
}
return d.versions[i] > d.versions[j]
}
func (d *dataPoints) Swap(i, j int) {
d.seriesIDs[i], d.seriesIDs[j] = d.seriesIDs[j], d.seriesIDs[i]
d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i]
d.versions[i], d.versions[j] = d.versions[j], d.versions[i]
d.tagFamilies[i], d.tagFamilies[j] = d.tagFamilies[j], d.tagFamilies[i]
d.fields[i], d.fields[j] = d.fields[j], d.fields[i]
}
func generateDataPoints() *dataPoints {
v := dataPointsPool.Get()
if v == nil {
return &dataPoints{}
}
return v
}
func releaseDataPoints(v *dataPoints) {
v.reset()
dataPointsPool.Put(v)
}
var dataPointsPool = pool.Register[*dataPoints]("measure-dataPoints")
type dataPointsInTable struct {
tsTable *tsTable
dataPoints *dataPoints
timeRange timestamp.TimeRange
}
type dataPointsInGroup struct {
tsdb storage.TSDB[*tsTable, option]
metadataDocs index.Documents
indexModeDocs index.Documents
metadataDocMap map[uint64]int
indexModeDocMap map[uint64]int
tables []*dataPointsInTable
segments []storage.Segment[*tsTable, option]
latestTS int64
}