banyand/tsdb/segment_ctrl.go (244 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package tsdb
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type segmentController struct {
shardCtx context.Context
blockQueue bucket.Queue
clock timestamp.Clock
scheduler *timestamp.Scheduler
l *logger.Logger
location string
lst []*segment
segmentSize IntervalRule
blockSize IntervalRule
sync.RWMutex
}
func newSegmentController(shardCtx context.Context, location string, segmentSize, blockSize IntervalRule,
openedBlockSize, maxOpenedBlockSize int, l *logger.Logger, scheduler *timestamp.Scheduler,
) (*segmentController, error) {
clock, _ := timestamp.GetClock(shardCtx)
sc := &segmentController{
shardCtx: shardCtx,
location: location,
segmentSize: segmentSize,
blockSize: blockSize,
l: l,
clock: clock,
scheduler: scheduler,
}
var err error
sc.blockQueue, err = bucket.NewQueue(
l.Named("block-queue"),
openedBlockSize,
maxOpenedBlockSize,
scheduler,
func(ctx context.Context, id interface{}) error {
bsID := id.(BlockID)
seg := sc.get(bsID.SegID)
if seg == nil {
l.Warn().Int("segID", parseSuffix(bsID.SegID)).Msg("segment is absent")
return nil
}
l.Info().Int("blockID", parseSuffix(bsID.BlockID)).Msg("closing the block")
return seg.closeBlock(ctx, bsID.BlockID)
})
return sc, err
}
func (sc *segmentController) get(segID SectionID) *segment {
lst := sc.segments()
last := len(lst) - 1
for i := range lst {
s := lst[last-i]
if s.id == segID {
return s
}
}
return nil
}
func (sc *segmentController) span(timeRange timestamp.TimeRange) (ss []*segment) {
lst := sc.segments()
last := len(lst) - 1
for i := range lst {
s := lst[last-i]
if s.Overlapping(timeRange) {
ss = append(ss, s)
}
}
return ss
}
func (sc *segmentController) segments() (ss []*segment) {
sc.RLock()
defer sc.RUnlock()
r := make([]*segment, len(sc.lst))
copy(r, sc.lst)
return r
}
func (sc *segmentController) Current() (bucket.Reporter, error) {
now := sc.Standard(sc.clock.Now())
ns := uint64(now.UnixNano())
if b := func() bucket.Reporter {
sc.RLock()
defer sc.RUnlock()
for _, s := range sc.lst {
if s.Contains(ns) {
return s
}
}
return nil
}(); b != nil {
return b, nil
}
return sc.create(now)
}
func (sc *segmentController) Next() (bucket.Reporter, error) {
c, err := sc.Current()
if err != nil {
return nil, err
}
seg := c.(*segment)
reporter, err := sc.create(sc.segmentSize.nextTime(seg.Start))
if errors.Is(err, errEndOfSegment) {
return nil, bucket.ErrNoMoreBucket
}
return reporter, err
}
func (sc *segmentController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
event := sc.l.Info()
if prev != nil {
event.Stringer("prev", prev)
}
if next != nil {
event.Stringer("next", next)
}
event.Msg("move to the next segment")
}
func (sc *segmentController) Standard(t time.Time) time.Time {
switch sc.segmentSize.Unit {
case HOUR:
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
case DAY:
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}
panic("invalid interval unit")
}
func (sc *segmentController) Format(tm time.Time) string {
switch sc.segmentSize.Unit {
case HOUR:
return tm.Format(hourFormat)
case DAY:
return tm.Format(dayFormat)
}
panic("invalid interval unit")
}
func (sc *segmentController) Parse(value string) (time.Time, error) {
switch sc.segmentSize.Unit {
case HOUR:
return time.ParseInLocation(hourFormat, value, time.Local)
case DAY:
return time.ParseInLocation(dayFormat, value, time.Local)
}
panic("invalid interval unit")
}
func (sc *segmentController) open() error {
sc.Lock()
defer sc.Unlock()
return loadSections(sc.location, segPathPrefix, sc, sc.segmentSize, func(start, end time.Time) error {
_, err := sc.load(start, end, sc.location)
if errors.Is(err, errEndOfSegment) {
return nil
}
return err
})
}
func (sc *segmentController) create(start time.Time) (*segment, error) {
sc.Lock()
defer sc.Unlock()
start = sc.Standard(start)
var next *segment
for _, s := range sc.lst {
if s.Contains(uint64(start.UnixNano())) {
return s, nil
}
if next == nil && s.Start.After(start) {
next = s
}
}
stdEnd := sc.segmentSize.nextTime(start)
var end time.Time
if next != nil && next.Start.Before(stdEnd) {
end = next.Start
} else {
end = stdEnd
}
if err := mkdirIfNotExist(segTemplate, sc.location, sc.Format(start)); err != nil {
return nil, err
}
return sc.load(start, end, sc.location)
}
func (sc *segmentController) sortLst() {
sort.Slice(sc.lst, func(i, j int) bool {
return sc.lst[i].id < sc.lst[j].id
})
}
// nolint: contextcheck
func (sc *segmentController) load(start, end time.Time, root string) (seg *segment, err error) {
suffix := sc.Format(start)
seg, err = openSegment(common.SetPosition(sc.shardCtx, func(p common.Position) common.Position {
p.Segment = suffix
return p
}), start, end, fmt.Sprintf(segTemplate, root, suffix), suffix, sc.segmentSize, sc.blockSize, sc.blockQueue, sc.scheduler)
if err != nil {
return nil, err
}
sc.lst = append(sc.lst, seg)
sc.sortLst()
return seg, nil
}
func (sc *segmentController) remove(ctx context.Context, deadline time.Time) (err error) {
sc.l.Info().Time("deadline", deadline).Msg("start to remove before deadline")
for _, s := range sc.segments() {
if s.End.Before(deadline) || s.Contains(uint64(deadline.UnixNano())) {
if e := sc.l.Debug(); e.Enabled() {
e.Stringer("segment", s).Msg("start to remove data in a segment")
}
err = multierr.Append(err, s.blockController.remove(ctx, deadline))
if s.End.Before(deadline) {
sc.Lock()
if errDel := s.delete(ctx); errDel != nil {
err = multierr.Append(err, errDel)
} else {
sc.removeSeg(s.id)
}
sc.Unlock()
}
}
}
return err
}
func (sc *segmentController) removeSeg(segID SectionID) {
for i, b := range sc.lst {
if b.id == segID {
sc.lst = append(sc.lst[:i], sc.lst[i+1:]...)
break
}
}
}
func (sc *segmentController) close(ctx context.Context) (err error) {
sc.Lock()
defer sc.Unlock()
for _, s := range sc.lst {
err = multierr.Append(err, s.close(ctx))
}
sc.lst = sc.lst[:0]
return err
}