in banyand/tsdb/bucket/bucket.go [101:152]
func (tr *timeBasedReporter) Report() (Channel, error) {
if tr.scheduler.Closed() {
return nil, errReporterClosed
}
now := tr.clock.Now()
if now.After(tr.End) {
return nil, errReporterClosed
}
ch := make(Channel, 1)
interval := tr.Duration() >> 4
if interval < 100*time.Millisecond {
interval = 100 * time.Millisecond
}
ms := interval / time.Millisecond
if err := tr.scheduler.Register(
fmt.Sprintf("%s-%d", tr.name, tr.count.Add(1)),
cron.Descriptor,
fmt.Sprintf("@every %dms", ms),
func(now time.Time, l *logger.Logger) bool {
status := Status{
Capacity: int(tr.End.UnixNano() - tr.Start.UnixNano()),
Volume: int(now.UnixNano() - tr.Start.UnixNano()),
}
if e := l.Debug(); e.Enabled() {
e.Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("reporting a status")
}
select {
case ch <- status:
default:
// TODO: this's too complicated, we should not use the channel anymore.
if status.Volume >= status.Capacity {
l.Warn().Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("the end status must be reported")
ch <- status
} else {
l.Warn().Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("ignore a status")
}
}
l.Info().Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("reported a status")
if status.Volume < status.Capacity {
return true
}
close(ch)
return false
}); err != nil {
close(ch)
if errors.Is(err, timestamp.ErrSchedulerClosed) {
return nil, errReporterClosed
}
return nil, err
}
return ch, nil
}