pkg/pb/v1/series.go (121 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package v1 import ( "sort" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" ) // Series denotes a series of data points. type Series struct { Subject string EntityValues []*modelv1.TagValue Buffer []byte ID common.SeriesID } // CopyTo copies the content of the series to the destination series. func (s *Series) CopyTo(dst *Series) { dst.Subject = s.Subject dst.EntityValues = make([]*modelv1.TagValue, len(s.EntityValues)) copy(dst.EntityValues, s.EntityValues) } // Marshal encodes series to internal Buffer and generates ID. func (s *Series) Marshal() error { s.Buffer = marshalEntityValue(s.Buffer, convert.StringToBytes(s.Subject)) var err error s.Buffer, err = MarshalTagValues(s.Buffer, s.EntityValues) if err != nil { return errors.WithMessage(err, "marshal subject and entity values") } s.ID = common.SeriesID(convert.Hash(s.Buffer)) return nil } // MarshalWithWildcard encodes series to internal Buffer and generates ID with wildcard. func (s *Series) MarshalWithWildcard() error { s.Buffer = marshalEntityValue(s.Buffer, convert.StringToBytes(s.Subject)) var err error for _, tv := range s.EntityValues { if s.Buffer, err = marshalTagValueWithWildcard(s.Buffer, tv); err != nil { return errors.WithMessage(err, "marshal subject and entity values") } } s.ID = common.SeriesID(convert.Hash(s.Buffer)) return nil } // Unmarshal decodes series from internal Buffer. func (s *Series) Unmarshal(src []byte) error { s.ID = common.SeriesID(convert.Hash(src)) var err error s.Buffer = s.Buffer[:0] if s.Buffer, src, err = unmarshalEntityValue(s.Buffer, src); err != nil { return errors.WithMessage(err, "unmarshal subject") } s.Subject = string(s.Buffer) for len(src) > 0 { s.Buffer = s.Buffer[:0] var tv *modelv1.TagValue if s.Buffer, src, tv, err = unmarshalTagValue(s.Buffer, src); err != nil { return errors.WithMessagef(err, "unmarshal tag value [%s], marshaled %s", src, s.EntityValues) } s.EntityValues = append(s.EntityValues, tv) } return nil } func (s *Series) reset() { s.ID = 0 s.Subject = "" s.EntityValues = s.EntityValues[:0] s.Buffer = s.Buffer[:0] } // SeriesList is a collection of Series. type SeriesList []*Series func (a SeriesList) Len() int { return len(a) } func (a SeriesList) Less(i, j int) bool { return a[i].ID < a[j].ID } func (a SeriesList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // Merge other SeriesList with this one to create a new SeriesList. func (a SeriesList) Merge(other SeriesList) SeriesList { if len(other) == 0 { return a } sort.Sort(other) if len(a) == 0 { return other } final := SeriesList{} i := 0 j := 0 for i < len(a) && j < len(other) { if a[i].ID < other[j].ID { final = append(final, a[i]) i++ } else { // deduplication if a[i].ID == other[j].ID { i++ } final = append(final, other[j]) j++ } } for ; i < len(a); i++ { final = append(final, a[i]) } for ; j < len(other); j++ { final = append(final, other[j]) } return final } // ToList converts SeriesList to posting.List. func (a SeriesList) ToList() posting.List { pl := roaring.NewPostingList() for _, v := range a { pl.Insert(uint64(v.ID)) } return pl } // IDs returns the IDs of the SeriesList. func (a SeriesList) IDs() []common.SeriesID { ids := make([]common.SeriesID, 0, len(a)) for _, v := range a { ids = append(ids, v.ID) } return ids }