banyand/property/db.go (171 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package property
import (
"context"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
const (
lockFilename = "lock"
)
var (
lfs = fs.NewLocalFileSystemWithLogger(logger.GetLogger("property"))
propertyScope = observability.RootScope.SubScope("property")
)
type database struct {
lock fs.File
omr observability.MetricsRegistry
logger *logger.Logger
sLst atomic.Pointer[[]*shard]
location string
flushInterval time.Duration
closed atomic.Bool
}
func openDB(ctx context.Context, location string, flushInterval time.Duration, omr observability.MetricsRegistry) (*database, error) {
loc := filepath.Clean(location)
lfs.MkdirIfNotExist(loc, storage.DirPerm)
l := logger.GetLogger("property")
db := &database{
location: loc,
logger: l,
omr: omr,
flushInterval: flushInterval,
}
if err := db.load(ctx); err != nil {
return nil, err
}
db.logger.Info().Str("path", loc).Msg("initialized")
lockPath := filepath.Join(loc, lockFilename)
lock, err := lfs.CreateLockFile(lockPath, storage.FilePerm)
if err != nil {
logger.Panicf("cannot create lock file %s: %s", lockPath, err)
}
db.lock = lock
observability.MetricsCollector.Register(loc, db.collect)
return db, nil
}
func (db *database) load(ctx context.Context) error {
if db.closed.Load() {
return errors.New("database is closed")
}
return walkDir(db.location, "shard-", func(suffix string) error {
id, err := strconv.Atoi(suffix)
if err != nil {
return err
}
_, err = db.loadShard(ctx, common.ShardID(id))
return err
})
}
func (db *database) update(ctx context.Context, shardID common.ShardID, id []byte, property *propertyv1.Property) error {
sd, err := db.loadShard(ctx, shardID)
if err != nil {
return err
}
err = sd.update(id, property)
if err != nil {
return err
}
return nil
}
func (db *database) delete(docIDs [][]byte) error {
sLst := db.sLst.Load()
if sLst == nil {
return nil
}
var err error
for _, s := range *sLst {
multierr.AppendInto(&err, s.delete(docIDs))
}
return err
}
func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([][]byte, error) {
iq, err := inverted.BuildPropertyQuery(req, groupField, entityID)
if err != nil {
return nil, err
}
sLst := db.sLst.Load()
if sLst == nil {
return nil, nil
}
var res [][]byte
for _, s := range *sLst {
r, err := s.search(ctx, iq, int(req.Limit))
if err != nil {
return nil, err
}
res = append(res, r...)
}
return res, nil
}
func (db *database) loadShard(ctx context.Context, id common.ShardID) (*shard, error) {
sLst := db.sLst.Load()
if sLst != nil {
for _, s := range *sLst {
if s.id == id {
return s, nil
}
}
}
sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, db.logger), id, int64(db.flushInterval.Seconds()))
if err != nil {
return nil, err
}
if sLst == nil {
sLst = &[]*shard{}
}
*sLst = append(*sLst, sd)
db.sLst.Store(sLst)
return sd, nil
}
func (db *database) close() error {
if db.closed.Swap(true) {
return nil
}
sLst := db.sLst.Load()
var err error
if sLst != nil {
for _, s := range *sLst {
multierr.AppendInto(&err, s.close())
}
}
db.lock.Close()
return err
}
func (db *database) collect() {
if db.closed.Load() {
return
}
sLst := db.sLst.Load()
if sLst == nil {
return
}
for _, s := range *sLst {
s.store.CollectMetrics()
}
}
type walkFn func(suffix string) error
func walkDir(root, prefix string, wf walkFn) error {
for _, f := range lfs.ReadDir(root) {
if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) {
continue
}
segs := strings.Split(f.Name(), "-")
errWalk := wf(segs[len(segs)-1])
if errWalk != nil {
return errors.WithMessagef(errWalk, "failed to load: %s", f.Name())
}
}
return nil
}