func()

in banyand/internal/storage/rotation.go [45:132]


func (d *database[T, O]) startRotationTask() error {
	options := d.segmentController.getOptions()
	var rt *retentionTask[T, O]
	if !d.disableRetention {
		rt = newRetentionTask(d, options.TTL)
	}
	go func(rt *retentionTask[T, O]) {
		var idleCheckTicker *time.Ticker
		var idleCheckC <-chan time.Time

		// Only create the ticker if idleTimeout is at least 1 second
		if d.segmentController.idleTimeout >= time.Second {
			idleCheckTicker = time.NewTicker(10 * time.Minute)
			idleCheckC = idleCheckTicker.C
			defer func() {
				if idleCheckTicker != nil {
					idleCheckTicker.Stop()
				}
			}()
		}

		for {
			select {
			case ts, ok := <-d.tsEventCh:
				if !ok {
					d.logger.Debug().Msg("tsEventCh closed")
					return
				}
				func(ts int64) {
					d.rotationProcessOn.Store(true)
					defer d.rotationProcessOn.Store(false)
					t := time.Unix(0, ts)
					if rt != nil {
						rt.run(t, d.logger)
					}
					func() {
						ss, err := d.segmentController.segments(true) // Ensure segments are open
						if err != nil {
							d.logger.Error().Err(err).Msg("failed to get segments")
							return
						}
						if len(ss) == 0 {
							return
						}
						defer func() {
							for i := 0; i < len(ss); i++ {
								ss[i].DecRef()
							}
						}()
						for i := range ss {
							if ss[i].End.UnixNano() < ts {
								ss[i].index.store.Reset()
							}
						}
						latest := ss[len(ss)-1]
						gap := latest.End.UnixNano() - ts
						// gap <=0 means the event is from the future
						// the segment will be created by a written event directly
						if gap <= 0 || gap > newSegmentTimeGap {
							return
						}
						d.incTotalRotationStarted(1)
						defer d.incTotalRotationFinished(1)
						start := options.SegmentInterval.nextTime(t)
						d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment")
						_, err = d.segmentController.create(start)
						if err != nil {
							d.logger.Error().Err(err).Msgf("failed to create new segment.")
							d.incTotalRotationErr(1)
						}
					}()
				}(ts)
			case <-idleCheckC:
				func() {
					d.logger.Debug().Msg("checking for idle segments")
					closedCount := d.segmentController.closeIdleSegments()
					if closedCount > 0 {
						d.logger.Info().Int("count", closedCount).Msg("closed idle segments")
					}
				}()
			}
		}
	}(rt)
	if rt == nil {
		return nil
	}
	return d.scheduler.Register("retention", rt.option, rt.expr, rt.run)
}