func()

in banyand/tsdb/bucket/bucket.go [101:152]


func (tr *timeBasedReporter) Report() (Channel, error) {
	if tr.scheduler.Closed() {
		return nil, errReporterClosed
	}
	now := tr.clock.Now()
	if now.After(tr.End) {
		return nil, errReporterClosed
	}
	ch := make(Channel, 1)
	interval := tr.Duration() >> 4
	if interval < 100*time.Millisecond {
		interval = 100 * time.Millisecond
	}
	ms := interval / time.Millisecond
	if err := tr.scheduler.Register(
		fmt.Sprintf("%s-%d", tr.name, tr.count.Add(1)),
		cron.Descriptor,
		fmt.Sprintf("@every %dms", ms),
		func(now time.Time, l *logger.Logger) bool {
			status := Status{
				Capacity: int(tr.End.UnixNano() - tr.Start.UnixNano()),
				Volume:   int(now.UnixNano() - tr.Start.UnixNano()),
			}
			if e := l.Debug(); e.Enabled() {
				e.Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("reporting a status")
			}
			select {
			case ch <- status:
			default:
				// TODO: this's too complicated, we should not use the channel anymore.
				if status.Volume >= status.Capacity {
					l.Warn().Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("the end status must be reported")
					ch <- status
				} else {
					l.Warn().Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("ignore a status")
				}
			}
			l.Info().Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("reported a status")
			if status.Volume < status.Capacity {
				return true
			}
			close(ch)
			return false
		}); err != nil {
		close(ch)
		if errors.Is(err, timestamp.ErrSchedulerClosed) {
			return nil, errReporterClosed
		}
		return nil, err
	}
	return ch, nil
}