banyand/property/shard.go (129 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package property
import (
"context"
"fmt"
"path"
"strconv"
"google.golang.org/protobuf/encoding/protojson"
"github.com/apache/skywalking-banyandb/api/common"
propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
)
const (
shardTemplate = "shard-%d"
sourceField = "_source"
groupField = "_group"
nameField = index.IndexModeName
entityID = "_entity_id"
)
var (
sourceFieldKey = index.FieldKey{TagName: sourceField}
entityFieldKey = index.FieldKey{TagName: entityID}
groupFieldKey = index.FieldKey{TagName: groupField}
nameFieldKey = index.FieldKey{TagName: nameField}
projection = []index.FieldKey{sourceFieldKey}
)
type shard struct {
store index.SeriesStore
l *logger.Logger
location string
id common.ShardID
}
func (s *shard) close() error {
if s.store != nil {
return s.store.Close()
}
return nil
}
func (db *database) newShard(ctx context.Context, id common.ShardID, flushTimeoutSeconds int64,
) (*shard, error) {
location := path.Join(db.location, fmt.Sprintf(shardTemplate, int(id)))
sName := "shard" + strconv.Itoa(int(id))
si := &shard{
id: id,
l: logger.Fetch(ctx, sName),
location: location,
}
opts := inverted.StoreOpts{
Path: location,
Logger: si.l,
Metrics: inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard": sName}))),
BatchWaitSec: flushTimeoutSeconds,
}
var err error
if si.store, err = inverted.NewStore(opts); err != nil {
return nil, err
}
return si, nil
}
func (s *shard) update(id []byte, property *propertyv1.Property) error {
pj, err := protojson.Marshal(property)
if err != nil {
return err
}
sourceField := index.NewBytesField(sourceFieldKey, pj)
sourceField.NoSort = true
sourceField.Store = true
entityField := index.NewBytesField(entityFieldKey, []byte(property.Id))
entityField.Index = true
groupField := index.NewBytesField(groupFieldKey, []byte(property.Metadata.Group))
groupField.Index = true
nameField := index.NewBytesField(nameFieldKey, []byte(property.Metadata.Name))
nameField.Index = true
doc := index.Document{
EntityValues: id,
Fields: []index.Field{entityField, groupField, nameField, sourceField},
}
for _, t := range property.Tags {
tv, err := pbv1.MarshalTagValue(t.Value)
if err != nil {
return err
}
tagField := index.NewBytesField(index.FieldKey{IndexRuleID: uint32(convert.HashStr(t.Key))}, tv)
tagField.Index = true
tagField.NoSort = true
doc.Fields = append(doc.Fields, tagField)
}
return s.store.UpdateSeriesBatch(index.Batch{
Documents: index.Documents{doc},
})
}
func (s *shard) delete(docID [][]byte) error {
return s.store.Delete(docID)
}
func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int,
) (data [][]byte, err error) {
tracer := query.GetTracer(ctx)
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "property.search")
span.Tagf("query", "%s", indexQuery.String())
span.Tagf("shard", "%d", s.id)
defer func() {
if data != nil {
span.Tagf("matched", "%d", len(data))
}
if err != nil {
span.Error(err)
}
span.Stop()
}()
}
ss, err := s.store.Search(ctx, projection, indexQuery, limit)
if err != nil {
return nil, err
}
if len(ss) == 0 {
return nil, nil
}
data = make([][]byte, 0, len(ss))
for _, s := range ss {
data = append(data, s.Fields[sourceField])
}
return data, nil
}