banyand/internal/storage/storage.go (139 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package storage implements a time-series-based storage engine.
// It provides:
// - Partition data based on a time axis.
// - Sharding data based on a series id which represents a unique entity of stream/measure
// - Retrieving data based on index.Filter.
// - Cleaning expired data, or the data retention.
package storage
import (
"context"
"io"
"time"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
const (
shardPathPrefix = "shard"
shardTemplate = shardPathPrefix + "-%d"
metadataPath = "metadata"
segTemplate = "seg-%s"
segPathPrefix = "seg"
hourFormat = "2006010215"
dayFormat = "20060102"
// DirPerm is the permission of the directory.
DirPerm = 0o700
// SnapshotsDir is the directory for snapshots.
SnapshotsDir = "snapshots"
// DataDir is the directory for data.
DataDir = "data"
// FilePerm is the permission of the file.
FilePerm = 0o600
)
var (
// ErrUnknownShard indicates that the shard is not found.
ErrUnknownShard = errors.New("unknown shard")
errOpenDatabase = errors.New("fails to open the database")
lfs = fs.NewLocalFileSystemWithLogger(logger.GetLogger("storage"))
)
// SupplyTSDB allows getting a tsdb's runtime.
type SupplyTSDB[T TSTable] func() T
// IndexSearchOpts is the options for searching index.
type IndexSearchOpts struct {
Query index.Query
Order *index.OrderBy
TimeRange *timestamp.TimeRange
Projection []index.FieldKey
PreloadSize int
}
// FieldResult is the result of a field.
type FieldResult map[string][]byte
// FieldResultList is a list of FieldResult.
type FieldResultList []FieldResult
// SeriesData is the result of a series.
type SeriesData struct {
SeriesList pbv1.SeriesList
Fields FieldResultList
Timestamps []int64
Versions []int64
}
// IndexDB is the interface of index database.
type IndexDB interface {
Insert(docs index.Documents) error
Update(docs index.Documents) error
Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (SeriesData, [][]byte, error)
SearchWithoutSeries(ctx context.Context, opts IndexSearchOpts) (sd SeriesData, sortedValues [][]byte, err error)
}
// TSDB allows listing and getting shard details.
type TSDB[T TSTable, O any] interface {
io.Closer
CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error)
Tick(ts int64)
UpdateOptions(opts *commonv1.ResourceOpts)
TakeFileSnapshot(dst string) error
GetExpiredSegmentsTimeRange() *timestamp.TimeRange
DeleteExpiredSegments(timeRange timestamp.TimeRange) int64
}
// Segment is a time range of data.
type Segment[T TSTable, O any] interface {
DecRef()
GetTimeRange() timestamp.TimeRange
CreateTSTableIfNotExist(shardID common.ShardID) (T, error)
Tables() []T
Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error)
IndexDB() IndexDB
}
// TSTable is time series table.
type TSTable interface {
io.Closer
Collect(Metrics)
TakeFileSnapshot(dst string) error
}
// TSTableCreator creates a TSTable.
type TSTableCreator[T TSTable, O any] func(fileSystem fs.FileSystem, root string, position common.Position,
l *logger.Logger, timeRange timestamp.TimeRange, option O, metrics any) (T, error)
// Metrics is the interface of metrics.
type Metrics interface {
// DeleteAll deletes all metrics.
DeleteAll()
}
// IntervalUnit denotes the unit of a time point.
type IntervalUnit int
// Available IntervalUnits. HOUR and DAY are adequate for the APM scenario.
const (
HOUR IntervalUnit = iota
DAY
)
func (iu IntervalUnit) String() string {
switch iu {
case HOUR:
return "hour"
case DAY:
return "day"
}
panic("invalid interval unit")
}
func (iu IntervalUnit) standard(t time.Time) time.Time {
switch iu {
case HOUR:
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
case DAY:
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}
panic("invalid interval unit")
}
// IntervalRule defines a length of two points in time.
type IntervalRule struct {
Unit IntervalUnit
Num int
}
func (ir IntervalRule) nextTime(current time.Time) time.Time {
switch ir.Unit {
case HOUR:
return current.Add(time.Hour * time.Duration(ir.Num))
case DAY:
return current.AddDate(0, 0, ir.Num)
}
panic("invalid interval unit")
}
func (ir IntervalRule) estimatedDuration() time.Duration {
switch ir.Unit {
case HOUR:
return time.Hour * time.Duration(ir.Num)
case DAY:
return 24 * time.Hour * time.Duration(ir.Num)
}
panic("invalid interval unit")
}
// MustToIntervalRule converts a commonv1.IntervalRule to IntervalRule.
func MustToIntervalRule(ir *commonv1.IntervalRule) (result IntervalRule) {
switch ir.Unit {
case commonv1.IntervalRule_UNIT_DAY:
result.Unit = DAY
case commonv1.IntervalRule_UNIT_HOUR:
result.Unit = HOUR
default:
logger.Panicf("unknown interval rule:%v", ir)
}
result.Num = int(ir.Num)
return result
}