banyand/tsdb/segment.go (113 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package tsdb
import (
"context"
"errors"
"os"
"strconv"
"sync"
"time"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var errEndOfSegment = errors.New("reached the end of the segment")
type segment struct {
globalIndex kv.Store
bucket.Reporter
l *logger.Logger
blockController *blockController
blockManageStrategy *bucket.Strategy
position common.Position
timestamp.TimeRange
path string
suffix string
closeOnce sync.Once
id SectionID
}
func openSegment(ctx context.Context, startTime, endTime time.Time, path, suffix string,
segmentSize, blockSize IntervalRule, blockQueue bucket.Queue, scheduler *timestamp.Scheduler,
) (s *segment, err error) {
suffixInteger, err := strconv.Atoi(suffix)
if err != nil {
return nil, err
}
id := GenerateInternalID(segmentSize.Unit, suffixInteger)
timeRange := timestamp.NewSectionTimeRange(startTime, endTime)
s = &segment{
id: id,
path: path,
suffix: suffix,
TimeRange: timeRange,
position: common.GetPosition(ctx),
}
l := logger.Fetch(ctx, s.String())
s.l = l
segCtx := context.WithValue(ctx, logger.ContextKey, l)
clock, segCtx := timestamp.GetClock(segCtx)
s.blockController = newBlockController(segCtx, id, suffix, path, timeRange, blockSize, l, blockQueue, scheduler)
s.Reporter = bucket.NewTimeBasedReporter(s.String(), timeRange, clock, scheduler)
err = s.blockController.open()
if err != nil {
return nil, err
}
indexPath, err := mkdir(globalIndexTemplate, path)
if err != nil {
return nil, err
}
o := ctx.Value(OptionsKey)
if o != nil {
options := o.(DatabaseOpts)
if options.EnableGlobalIndex {
memSize := options.GlobalIndexMemSize
if memSize == 0 {
memSize = defaultKVMemorySize
}
if s.globalIndex, err = kv.OpenStore(
indexPath,
kv.StoreWithLogger(s.l),
kv.StoreWithMemTableSize(int64(memSize)),
); err != nil {
return nil, err
}
}
}
if !s.End.After(clock.Now()) {
return
}
s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l))
if err != nil {
return nil, err
}
s.blockManageStrategy.Run()
return s, nil
}
func (s *segment) close(ctx context.Context) (err error) {
s.closeOnce.Do(func() {
if err = s.blockController.close(ctx); err != nil {
return
}
if s.globalIndex != nil {
if err = s.globalIndex.Close(); err != nil {
return
}
}
if s.blockManageStrategy != nil {
s.blockManageStrategy.Close()
}
})
return nil
}
func (s *segment) closeBlock(ctx context.Context, id SectionID) error {
return s.blockController.closeBlock(ctx, id)
}
func (s *segment) delete(ctx context.Context) error {
if err := s.close(ctx); err != nil {
return err
}
return os.RemoveAll(s.path)
}
func (s *segment) String() string {
return "SegID-" + s.suffix
}