banyand/internal/storage/segment.go (560 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package storage
import (
"context"
"fmt"
"io/fs"
"path"
"path/filepath"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
// ErrExpiredData is returned when the data is expired.
var ErrExpiredData = errors.New("expired data")
// ErrSegmentClosed is returned when trying to access a closed segment.
var ErrSegmentClosed = errors.New("segment closed")
type segment[T TSTable, O any] struct {
metrics any
tsdbOpts *TSDBOpts[T, O]
l *logger.Logger
index *seriesIndex
sLst atomic.Pointer[[]*shard[T]]
indexMetrics *inverted.Metrics
position common.Position
timestamp.TimeRange
suffix string
location string
lastAccessed atomic.Int64
mu sync.Mutex
refCount int32
mustBeDeleted uint32
id segmentID
}
func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime, endTime time.Time, path, suffix string,
) (s *segment[T, O], err error) {
suffixInteger, err := strconv.Atoi(suffix)
if err != nil {
return nil, err
}
p := common.GetPosition(ctx)
p.Segment = suffix
ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
return p
})
options := sc.getOptions()
id := generateSegID(options.SegmentInterval.Unit, suffixInteger)
s = &segment[T, O]{
id: id,
location: path,
suffix: suffix,
TimeRange: timestamp.NewSectionTimeRange(startTime, endTime),
position: p,
metrics: sc.metrics,
indexMetrics: sc.indexMetrics,
tsdbOpts: options,
}
s.l = logger.Fetch(ctx, s.String())
s.lastAccessed.Store(time.Now().UnixNano())
return s, s.initialize(ctx)
}
func (s *segment[T, O]) loadShards(shardNum int) error {
return walkDir(s.location, shardPathPrefix, func(suffix string) error {
shardID, err := strconv.Atoi(suffix)
if err != nil {
return err
}
if shardID >= shardNum {
return nil
}
s.l.Info().Int("shard_id", shardID).Msg("loaded a existed shard")
_, err = s.createShardIfNotExist(common.ShardID(shardID))
return err
})
}
func (s *segment[T, O]) GetTimeRange() timestamp.TimeRange {
return s.TimeRange
}
func (s *segment[T, O]) Tables() (tt []T) {
sLst := s.sLst.Load()
if sLst != nil {
for _, s := range *sLst {
tt = append(tt, s.table)
}
}
return tt
}
func (s *segment[T, O]) incRef(ctx context.Context) error {
s.lastAccessed.Store(time.Now().UnixNano())
if atomic.LoadInt32(&s.refCount) <= 0 {
return s.initialize(ctx)
}
atomic.AddInt32(&s.refCount, 1)
return nil
}
func (s *segment[T, O]) initialize(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
if atomic.LoadInt32(&s.refCount) > 0 {
return nil
}
ctx = context.WithValue(ctx, logger.ContextKey, s.l)
ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
return s.position
})
sir, err := newSeriesIndex(ctx, s.location, s.tsdbOpts.SeriesIndexFlushTimeoutSeconds, s.tsdbOpts.SeriesIndexCacheMaxBytes, s.indexMetrics)
if err != nil {
return errors.Wrap(errOpenDatabase, errors.WithMessage(err, "create series index controller failed").Error())
}
s.index = sir
err = s.loadShards(int(s.tsdbOpts.ShardNum))
if err != nil {
s.index.Close()
s.index = nil
return errors.Wrap(errOpenDatabase, errors.WithMessage(err, "load shards failed").Error())
}
atomic.StoreInt32(&s.refCount, 1)
s.l.Info().Stringer("seg", s).Msg("segment initialized")
return nil
}
func (s *segment[T, O]) DecRef() {
shouldCleanup := false
if atomic.LoadInt32(&s.refCount) <= 0 && atomic.LoadUint32(&s.mustBeDeleted) != 0 {
shouldCleanup = true
} else {
for {
current := atomic.LoadInt32(&s.refCount)
if current <= 0 {
return
}
if atomic.CompareAndSwapInt32(&s.refCount, current, current-1) {
shouldCleanup = current == 1
break
}
}
}
if !shouldCleanup {
return
}
s.performCleanup()
}
func (s *segment[T, O]) performCleanup() {
s.mu.Lock()
defer s.mu.Unlock()
if atomic.LoadInt32(&s.refCount) > 0 && atomic.LoadUint32(&s.mustBeDeleted) == 0 {
return
}
deletePath := ""
if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
deletePath = s.location
}
if s.index != nil {
if err := s.index.Close(); err != nil {
s.l.Panic().Err(err).Msg("failed to close the series index")
}
s.index = nil
}
sLst := s.sLst.Load()
if sLst != nil {
for _, shard := range *sLst {
shard.close()
}
if deletePath == "" {
s.sLst.Store(&[]*shard[T]{})
}
}
if deletePath != "" {
lfs.MustRMAll(deletePath)
}
}
func (s *segment[T, O]) delete() {
atomic.StoreUint32(&s.mustBeDeleted, 1)
s.DecRef()
}
func (s *segment[T, O]) CreateTSTableIfNotExist(id common.ShardID) (T, error) {
if s, ok := s.getShard(id); ok {
return s.table, nil
}
s.mu.Lock()
defer s.mu.Unlock()
return s.createShardIfNotExist(id)
}
func (s *segment[T, O]) createShardIfNotExist(id common.ShardID) (T, error) {
if s, ok := s.getShard(id); ok {
return s.table, nil
}
ctx := context.WithValue(context.Background(), logger.ContextKey, s.l)
ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
return s.position
})
so, err := s.openShard(ctx, id)
if err != nil {
var t T
return t, err
}
var shardList []*shard[T]
sLst := s.sLst.Load()
if sLst != nil {
shardList = *sLst
}
shardList = append(shardList, so)
s.sLst.Store(&shardList)
return so.table, nil
}
func (s *segment[T, O]) getShard(shardID common.ShardID) (*shard[T], bool) {
sLst := s.sLst.Load()
if sLst != nil {
for _, s := range *sLst {
if s.id == shardID {
return s, true
}
}
}
return nil, false
}
func (s *segment[T, O]) String() string {
return "SegID-" + s.suffix
}
type segmentController[T TSTable, O any] struct {
clock timestamp.Clock
metrics Metrics
opts *TSDBOpts[T, O]
l *logger.Logger
indexMetrics *inverted.Metrics
position common.Position
db string
stage string
location string
lst []*segment[T, O]
deadline atomic.Int64
idleTimeout time.Duration
optsMutex sync.RWMutex
sync.RWMutex
}
func newSegmentController[T TSTable, O any](ctx context.Context, location string,
l *logger.Logger, opts TSDBOpts[T, O], indexMetrics *inverted.Metrics, metrics Metrics,
idleTimeout time.Duration,
) *segmentController[T, O] {
clock, _ := timestamp.GetClock(ctx)
p := common.GetPosition(ctx)
return &segmentController[T, O]{
location: location,
opts: &opts,
l: l,
clock: clock,
position: common.GetPosition(ctx),
metrics: metrics,
indexMetrics: indexMetrics,
stage: p.Stage,
db: p.Database,
idleTimeout: idleTimeout,
}
}
func (sc *segmentController[T, O]) getOptions() *TSDBOpts[T, O] {
sc.optsMutex.RLock()
defer sc.optsMutex.RUnlock()
return sc.opts
}
func (sc *segmentController[T, O]) updateOptions(resourceOpts *commonv1.ResourceOpts) {
sc.optsMutex.Lock()
defer sc.optsMutex.Unlock()
si := MustToIntervalRule(resourceOpts.SegmentInterval)
if sc.opts.SegmentInterval.Unit != si.Unit {
sc.l.Panic().Msg("segment interval unit cannot be changed")
return
}
sc.opts.SegmentInterval = si
sc.opts.TTL = MustToIntervalRule(resourceOpts.Ttl)
sc.opts.ShardNum = resourceOpts.ShardNum
}
func (sc *segmentController[T, O]) selectSegments(timeRange timestamp.TimeRange) (tt []Segment[T, O], err error) {
sc.RLock()
defer sc.RUnlock()
last := len(sc.lst) - 1
ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
for i := range sc.lst {
s := sc.lst[last-i]
if s.GetTimeRange().End.Before(timeRange.Start) {
break
}
if s.Overlapping(timeRange) {
if err = s.incRef(ctx); err != nil {
return nil, err
}
tt = append(tt, s)
}
}
return tt, nil
}
func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T, O], error) {
// Before the first remove old segment run, any segment should be created.
if sc.deadline.Load() > ts.UnixNano() {
return nil, ErrExpiredData
}
s, err := sc.create(ts)
if err != nil {
return nil, err
}
return s, s.incRef(context.WithValue(context.Background(), logger.ContextKey, sc.l))
}
func (sc *segmentController[T, O]) segments(reopenClosed bool) (ss []*segment[T, O], err error) {
sc.RLock()
defer sc.RUnlock()
r := make([]*segment[T, O], len(sc.lst))
ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
for i := range sc.lst {
if reopenClosed {
if err = sc.lst[i].incRef(ctx); err != nil {
return nil, err
}
} else {
if atomic.LoadInt32(&sc.lst[i].refCount) > 0 {
atomic.AddInt32(&sc.lst[i].refCount, 1)
}
}
r[i] = sc.lst[i]
}
return r, nil
}
func (sc *segmentController[T, O]) closeIdleSegments() int {
maxIdleTime := sc.idleTimeout
now := time.Now().UnixNano()
idleThreshold := now - maxIdleTime.Nanoseconds()
segs, _ := sc.segments(false)
closedCount := 0
for _, seg := range segs {
lastAccess := seg.lastAccessed.Load()
// Only consider segments that have been idle for longer than the threshold
// and have active references (are not already closed)
if lastAccess < idleThreshold && atomic.LoadInt32(&seg.refCount) > 0 {
seg.DecRef()
}
seg.DecRef()
if atomic.LoadInt32(&seg.refCount) == 0 {
closedCount++
}
}
return closedCount
}
func (sc *segmentController[T, O]) format(tm time.Time) string {
switch sc.getOptions().SegmentInterval.Unit {
case HOUR:
return tm.Format(hourFormat)
case DAY:
return tm.Format(dayFormat)
}
panic("invalid interval unit")
}
func (sc *segmentController[T, O]) parse(value string) (time.Time, error) {
switch sc.getOptions().SegmentInterval.Unit {
case HOUR:
return time.ParseInLocation(hourFormat, value, time.Local)
case DAY:
return time.ParseInLocation(dayFormat, value, time.Local)
}
panic("invalid interval unit")
}
func (sc *segmentController[T, O]) open() error {
sc.Lock()
defer sc.Unlock()
emptySegments := make([]string, 0)
err := loadSegments(sc.location, segPathPrefix, sc, sc.getOptions().SegmentInterval, func(start, end time.Time) error {
suffix := sc.format(start)
segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, suffix))
metadataPath := path.Join(segmentPath, metadataFilename)
version, err := lfs.Read(metadataPath)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
emptySegments = append(emptySegments, segmentPath)
return nil
}
return err
}
if len(version) == 0 {
emptySegments = append(emptySegments, segmentPath)
return nil
}
if err = checkVersion(convert.BytesToString(version)); err != nil {
return err
}
_, err = sc.load(start, end, sc.location)
return err
})
if len(emptySegments) > 0 {
sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments found, removing them.")
for i := range emptySegments {
lfs.MustRMAll(emptySegments[i])
}
}
return err
}
func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O], error) {
sc.Lock()
defer sc.Unlock()
last := len(sc.lst) - 1
for i := range sc.lst {
s := sc.lst[last-i]
if s.Contains(start.UnixNano()) {
return s, nil
}
}
options := sc.getOptions()
start = options.SegmentInterval.Unit.standard(start)
var next *segment[T, O]
for _, s := range sc.lst {
if s.Contains(start.UnixNano()) {
return s, nil
}
if next == nil && s.Start.After(start) {
next = s
}
}
stdEnd := options.SegmentInterval.nextTime(start)
var end time.Time
if next != nil && next.Start.Before(stdEnd) {
end = next.Start
} else {
end = stdEnd
}
segPath := path.Join(sc.location, fmt.Sprintf(segTemplate, sc.format(start)))
lfs.MkdirPanicIfExist(segPath, DirPerm)
data := []byte(currentVersion)
metadataPath := filepath.Join(segPath, metadataFilename)
lf, err := lfs.CreateLockFile(metadataPath, FilePerm)
if err != nil {
logger.Panicf("cannot create lock file %s: %s", metadataPath, err)
}
n, err := lf.Write(data)
if err != nil {
logger.Panicf("cannot write metadata %s: %s", metadataPath, err)
}
if n != len(data) {
logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", metadataPath, n, len(data))
}
return sc.load(start, end, sc.location)
}
func (sc *segmentController[T, O]) sortLst() {
sort.Slice(sc.lst, func(i, j int) bool {
return sc.lst[i].id < sc.lst[j].id
})
}
func (sc *segmentController[T, O]) load(start, end time.Time, root string) (seg *segment[T, O], err error) {
suffix := sc.format(start)
segPath := path.Join(root, fmt.Sprintf(segTemplate, suffix))
ctx := common.SetPosition(context.WithValue(context.Background(), logger.ContextKey, sc.l), func(_ common.Position) common.Position {
return sc.position
})
seg, err = sc.openSegment(ctx, start, end, segPath, suffix)
if err != nil {
return nil, err
}
sc.lst = append(sc.lst, seg)
sc.sortLst()
return seg, nil
}
func (sc *segmentController[T, O]) remove(deadline time.Time) (hasSegment bool, err error) {
ss, _ := sc.segments(false)
for _, s := range ss {
if s.Before(deadline) {
hasSegment = true
id := s.id
s.delete()
sc.Lock()
sc.removeSeg(id)
sc.Unlock()
sc.l.Info().Stringer("segment", s).Msg("removed a segment")
}
s.DecRef()
}
return hasSegment, err
}
func (sc *segmentController[T, O]) getExpiredSegmentsTimeRange() *timestamp.TimeRange {
deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
timeRange := ×tamp.TimeRange{
IncludeStart: true,
IncludeEnd: false,
}
ss, _ := sc.segments(false)
for _, s := range ss {
if s.Before(deadline) {
if timeRange.Start.IsZero() {
timeRange.Start = s.Start
}
timeRange.End = s.End
}
s.DecRef()
}
return timeRange
}
func (sc *segmentController[T, O]) deleteExpiredSegments(timeRange timestamp.TimeRange) int64 {
deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
var count int64
ss, _ := sc.segments(false)
for _, s := range ss {
if s.Before(deadline) && s.Overlapping(timeRange) {
s.delete()
sc.Lock()
sc.removeSeg(s.id)
sc.Unlock()
count++
}
s.DecRef()
}
return count
}
func (sc *segmentController[T, O]) removeSeg(segID segmentID) {
for i, b := range sc.lst {
if b.id == segID {
sc.lst = append(sc.lst[:i], sc.lst[i+1:]...)
if len(sc.lst) < 1 {
sc.deadline.Store(0)
} else {
sc.deadline.Store(sc.lst[0].Start.UnixNano())
}
break
}
}
}
func (sc *segmentController[T, O]) close() {
sc.Lock()
defer sc.Unlock()
for _, s := range sc.lst {
s.DecRef()
}
sc.lst = sc.lst[:0]
if sc.metrics != nil {
sc.metrics.DeleteAll()
}
}
func loadSegments[T TSTable, O any](root, prefix string, parser *segmentController[T, O], intervalRule IntervalRule, loadFn func(start, end time.Time) error) error {
var startTimeLst []time.Time
if err := walkDir(
root,
prefix,
func(suffix string) error {
startTime, err := parser.parse(suffix)
if err != nil {
return err
}
startTimeLst = append(startTimeLst, startTime)
return nil
}); err != nil {
return err
}
sort.Slice(startTimeLst, func(i, j int) bool { return i < j })
for i, start := range startTimeLst {
var end time.Time
if i < len(startTimeLst)-1 {
end = startTimeLst[i+1]
} else {
end = intervalRule.nextTime(start)
}
if err := loadFn(start, end); err != nil {
return err
}
}
return nil
}