banyand/tsdb/block_ctrl.go (303 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package tsdb import ( "context" "fmt" "sort" "sync" "time" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type blockController struct { segCtx context.Context blockQueue bucket.Queue clock timestamp.Clock scheduler *timestamp.Scheduler l *logger.Logger segTimeRange timestamp.TimeRange segSuffix string location string lst []*block blockSize IntervalRule sync.RWMutex segID SectionID } func newBlockController(segCtx context.Context, segID SectionID, segSuffix, location string, segTimeRange timestamp.TimeRange, blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue, scheduler *timestamp.Scheduler, ) *blockController { clock, _ := timestamp.GetClock(segCtx) return &blockController{ segCtx: segCtx, segID: segID, segSuffix: segSuffix, location: location, blockSize: blockSize, segTimeRange: segTimeRange, blockQueue: blockQueue, l: l, clock: clock, scheduler: scheduler, } } func (bc *blockController) Current() (bucket.Reporter, error) { now := bc.Standard(bc.clock.Now()) ns := uint64(now.UnixNano()) if b := func() *block { bc.RLock() defer bc.RUnlock() for _, s := range bc.lst { if s.Contains(ns) { return s } } return nil }(); b != nil { if err := b.openSafely(); err != nil { return nil, err } return b, nil } return bc.newHeadBlock(now) } func (bc *blockController) Next() (bucket.Reporter, error) { c, err := bc.Current() if err != nil { return nil, err } b := c.(*block) return bc.newHeadBlock(bc.blockSize.nextTime(b.Start)) } func (bc *blockController) newHeadBlock(now time.Time) (*block, error) { b, err := bc.create(now) if err != nil { return nil, err } return b, nil } func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) { event := bc.l.Info() if prev != nil { event.Stringer("prev", prev) b := prev.(*block) ctx, cancel := context.WithTimeout(context.Background(), defaultEnqueueTimeout) defer cancel() if err := bc.blockQueue.Push(ctx, BlockID{ SegID: bc.segID, BlockID: b.blockID, }, nil); err != nil { bc.l.Error().Err(err).Msg("failed to push a expired head block to the queue") ctxClosing, cancelClosing := context.WithTimeout(context.Background(), defaultEnqueueTimeout) defer cancelClosing() b.close(ctxClosing) } } if next != nil { event.Stringer("next", next) } event.Msg("move to the next block") } func (bc *blockController) Standard(t time.Time) time.Time { switch bc.blockSize.Unit { case HOUR: return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) case DAY: return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) } panic("invalid interval unit") } func (bc *blockController) Format(tm time.Time) string { switch bc.blockSize.Unit { case HOUR: return tm.Format(hourFormat) case DAY: return tm.Format(dayFormat) } panic("invalid interval unit") } func (bc *blockController) Parse(value string) (time.Time, error) { switch bc.blockSize.Unit { case HOUR: return time.ParseInLocation(hourFormat, value, time.Local) case DAY: return time.ParseInLocation(dayFormat, value, time.Local) } panic("invalid interval unit") } func (bc *blockController) span(ctx context.Context, timeRange timestamp.TimeRange) ([]blockDelegate, error) { bb := bc.search(func(b *block) bool { return b.Overlapping(timeRange) }) if bb == nil { return nil, nil } dd := make([]blockDelegate, len(bb)) for i, b := range bb { d, err := b.delegate(ctx) if err != nil { return nil, err } dd[i] = d } return dd, nil } func (bc *blockController) get(ctx context.Context, blockID SectionID) (blockDelegate, error) { b := bc.getBlock(blockID) if b != nil { return b.delegate(ctx) } return nil, nil } func (bc *blockController) getBlock(blockID SectionID) *block { bb := bc.search(func(b *block) bool { return b.blockID == blockID }) if len(bb) > 0 { return bb[0] } return nil } func (bc *blockController) blocks() []*block { bc.RLock() defer bc.RUnlock() r := make([]*block, len(bc.lst)) copy(r, bc.lst) return r } func (bc *blockController) search(matcher func(*block) bool) (bb []*block) { lst := bc.blocks() last := len(lst) - 1 for i := range lst { b := lst[last-i] if matcher(b) { bb = append(bb, b) } } return bb } func (bc *blockController) closeBlock(ctx context.Context, blockID SectionID) error { bc.RLock() b := bc.getBlock(blockID) bc.RUnlock() if b == nil { return nil } return b.close(ctx) } func (bc *blockController) open() error { bc.Lock() defer bc.Unlock() return loadSections(bc.location, blockPathPrefix, bc, bc.blockSize, func(start, end time.Time) error { _, err := bc.load(start, end, bc.location) return err }) } func (bc *blockController) create(start time.Time) (*block, error) { start = bc.Standard(start) if start.Before(bc.segTimeRange.Start) { start = bc.segTimeRange.Start } if !start.Before(bc.segTimeRange.End) { return nil, bucket.ErrNoMoreBucket } bc.Lock() defer bc.Unlock() var next *block for _, s := range bc.lst { if s.Start.Equal(start) { return s, nil } if next == nil && s.Start.After(start) { next = s } } stdEnd := bc.blockSize.nextTime(start) var end time.Time if next != nil && next.Start.Before(stdEnd) { end = next.Start } else { end = stdEnd } if end.After(bc.segTimeRange.End) { end = bc.segTimeRange.End } if err := mkdirIfNotExist(blockTemplate, bc.location, bc.Format(start)); err != nil { return nil, err } b, err := bc.load(start, end, bc.location) if err != nil { return nil, err } if err = b.openSafely(); err != nil { return nil, err } return b, nil } // nolint: contextcheck func (bc *blockController) load(startTime, endTime time.Time, root string) (b *block, err error) { suffix := bc.Format(startTime) if b, err = newBlock( common.SetPosition(bc.segCtx, func(p common.Position) common.Position { p.Block = suffix return p }), blockOpts{ segID: bc.segID, segSuffix: bc.segSuffix, path: fmt.Sprintf(blockTemplate, root, suffix), timeRange: timestamp.NewSectionTimeRange(startTime, endTime), suffix: suffix, blockSize: bc.blockSize, queue: bc.blockQueue, scheduler: bc.scheduler, }); err != nil { return nil, err } bc.lst = append(bc.lst, b) bc.sortLst() return b, nil } func (bc *blockController) sortLst() { sort.Slice(bc.lst, func(i, j int) bool { return bc.lst[i].blockID < bc.lst[j].blockID }) } func (bc *blockController) close(ctx context.Context) (err error) { bc.Lock() defer bc.Unlock() for _, s := range bc.lst { err = multierr.Append(err, s.close(ctx)) } bc.lst = bc.lst[:0] return err } func (bc *blockController) remove(ctx context.Context, deadline time.Time) (err error) { for _, b := range bc.blocks() { if b.End.Before(deadline) { if e := bc.l.Debug(); e.Enabled() { e.Stringer("block", b).Msg("start to remove data in a block") } bc.Lock() if errDel := b.delete(ctx); errDel != nil { err = multierr.Append(err, errDel) } else { b.queue.Remove(BlockID{ BlockID: b.blockID, SegID: b.segID, }) bc.removeBlock(b.blockID) } bc.Unlock() } } return err } func (bc *blockController) removeBlock(blockID SectionID) { for i, b := range bc.lst { if b.blockID == blockID { bc.lst = append(bc.lst[:i], bc.lst[i+1:]...) break } } }