in banyand/internal/storage/rotation.go [45:132]
func (d *database[T, O]) startRotationTask() error {
options := d.segmentController.getOptions()
var rt *retentionTask[T, O]
if !d.disableRetention {
rt = newRetentionTask(d, options.TTL)
}
go func(rt *retentionTask[T, O]) {
var idleCheckTicker *time.Ticker
var idleCheckC <-chan time.Time
// Only create the ticker if idleTimeout is at least 1 second
if d.segmentController.idleTimeout >= time.Second {
idleCheckTicker = time.NewTicker(10 * time.Minute)
idleCheckC = idleCheckTicker.C
defer func() {
if idleCheckTicker != nil {
idleCheckTicker.Stop()
}
}()
}
for {
select {
case ts, ok := <-d.tsEventCh:
if !ok {
d.logger.Debug().Msg("tsEventCh closed")
return
}
func(ts int64) {
d.rotationProcessOn.Store(true)
defer d.rotationProcessOn.Store(false)
t := time.Unix(0, ts)
if rt != nil {
rt.run(t, d.logger)
}
func() {
ss, err := d.segmentController.segments(true) // Ensure segments are open
if err != nil {
d.logger.Error().Err(err).Msg("failed to get segments")
return
}
if len(ss) == 0 {
return
}
defer func() {
for i := 0; i < len(ss); i++ {
ss[i].DecRef()
}
}()
for i := range ss {
if ss[i].End.UnixNano() < ts {
ss[i].index.store.Reset()
}
}
latest := ss[len(ss)-1]
gap := latest.End.UnixNano() - ts
// gap <=0 means the event is from the future
// the segment will be created by a written event directly
if gap <= 0 || gap > newSegmentTimeGap {
return
}
d.incTotalRotationStarted(1)
defer d.incTotalRotationFinished(1)
start := options.SegmentInterval.nextTime(t)
d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment")
_, err = d.segmentController.create(start)
if err != nil {
d.logger.Error().Err(err).Msgf("failed to create new segment.")
d.incTotalRotationErr(1)
}
}()
}(ts)
case <-idleCheckC:
func() {
d.logger.Debug().Msg("checking for idle segments")
closedCount := d.segmentController.closeIdleSegments()
if closedCount > 0 {
d.logger.Info().Int("count", closedCount).Msg("closed idle segments")
}
}()
}
}
}(rt)
if rt == nil {
return nil
}
return d.scheduler.Register("retention", rt.option, rt.expr, rt.run)
}