banyand/internal/storage/rotation.go (144 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package storage
import (
"time"
"github.com/robfig/cron/v3"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
var (
creationGap = time.Hour
newSegmentTimeGap = creationGap.Nanoseconds()
timeEventSnapDuration = (10 * time.Minute).Nanoseconds()
)
func (d *database[T, O]) Tick(ts int64) {
if (ts - timeEventSnapDuration) < d.latestTickTime.Load() {
return
}
d.latestTickTime.Store(ts)
select {
case d.tsEventCh <- ts:
default:
}
}
func (d *database[T, O]) startRotationTask() error {
options := d.segmentController.getOptions()
var rt *retentionTask[T, O]
if !d.disableRetention {
rt = newRetentionTask(d, options.TTL)
}
go func(rt *retentionTask[T, O]) {
var idleCheckTicker *time.Ticker
var idleCheckC <-chan time.Time
// Only create the ticker if idleTimeout is at least 1 second
if d.segmentController.idleTimeout >= time.Second {
idleCheckTicker = time.NewTicker(10 * time.Minute)
idleCheckC = idleCheckTicker.C
defer func() {
if idleCheckTicker != nil {
idleCheckTicker.Stop()
}
}()
}
for {
select {
case ts, ok := <-d.tsEventCh:
if !ok {
d.logger.Debug().Msg("tsEventCh closed")
return
}
func(ts int64) {
d.rotationProcessOn.Store(true)
defer d.rotationProcessOn.Store(false)
t := time.Unix(0, ts)
if rt != nil {
rt.run(t, d.logger)
}
func() {
ss, err := d.segmentController.segments(true) // Ensure segments are open
if err != nil {
d.logger.Error().Err(err).Msg("failed to get segments")
return
}
if len(ss) == 0 {
return
}
defer func() {
for i := 0; i < len(ss); i++ {
ss[i].DecRef()
}
}()
for i := range ss {
if ss[i].End.UnixNano() < ts {
ss[i].index.store.Reset()
}
}
latest := ss[len(ss)-1]
gap := latest.End.UnixNano() - ts
// gap <=0 means the event is from the future
// the segment will be created by a written event directly
if gap <= 0 || gap > newSegmentTimeGap {
return
}
d.incTotalRotationStarted(1)
defer d.incTotalRotationFinished(1)
start := options.SegmentInterval.nextTime(t)
d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment")
_, err = d.segmentController.create(start)
if err != nil {
d.logger.Error().Err(err).Msgf("failed to create new segment.")
d.incTotalRotationErr(1)
}
}()
}(ts)
case <-idleCheckC:
func() {
d.logger.Debug().Msg("checking for idle segments")
closedCount := d.segmentController.closeIdleSegments()
if closedCount > 0 {
d.logger.Info().Int("count", closedCount).Msg("closed idle segments")
}
}()
}
}
}(rt)
if rt == nil {
return nil
}
return d.scheduler.Register("retention", rt.option, rt.expr, rt.run)
}
type retentionTask[T TSTable, O any] struct {
database *database[T, O]
running chan struct{}
expr string
option cron.ParseOption
duration time.Duration
}
func newRetentionTask[T TSTable, O any](database *database[T, O], ttl IntervalRule) *retentionTask[T, O] {
return &retentionTask[T, O]{
database: database,
option: cron.Minute | cron.Hour,
// Remove data which is
expr: "5 0",
duration: ttl.estimatedDuration(),
running: make(chan struct{}, 1),
}
}
func (rc *retentionTask[T, O]) run(now time.Time, l *logger.Logger) bool {
select {
case rc.running <- struct{}{}:
default:
return true
}
defer func() {
<-rc.running
}()
rc.database.incTotalRetentionStarted(1)
defer rc.database.incTotalRetentionFinished(1)
deadline := now.Add(-rc.duration)
start := time.Now()
hasData, err := rc.database.segmentController.remove(deadline)
if hasData {
rc.database.incTotalRetentionHasData(1)
rc.database.incTotalRetentionHasDataLatency(time.Since(start).Seconds())
}
if err != nil {
l.Error().Err(err)
rc.database.incTotalRetentionErr(1)
}
return true
}