banyand/tsdb/bucket/bucket.go (110 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package bucket implements a rolling bucket system. package bucket import ( "errors" "fmt" "sync/atomic" "time" "github.com/robfig/cron/v3" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var errReporterClosed = errors.New("reporter is closed") // Controller defines the provider of a Reporter. type Controller interface { Current() (Reporter, error) Next() (Reporter, error) OnMove(prev, next Reporter) } // Status is a sample of the Reporter's status. type Status struct { Capacity int Volume int } // Channel reports the status of a Reporter. type Channel chan Status // Reporter allows reporting status to its supervisor. type Reporter interface { // TODO: refactor Report to return a status. It's too complicated to return a channel Report() (Channel, error) String() string } var ( _ Reporter = (*dummyReporter)(nil) _ Reporter = (*timeBasedReporter)(nil) // DummyReporter is a special Reporter to avoid nil errors. DummyReporter = &dummyReporter{} ) type dummyReporter struct{} func (*dummyReporter) Report() (Channel, error) { return nil, errReporterClosed } func (*dummyReporter) Stop() { } func (*dummyReporter) String() string { return "dummy-reporter" } type timeBasedReporter struct { clock timestamp.Clock scheduler *timestamp.Scheduler count *atomic.Uint32 timestamp.TimeRange name string } // NewTimeBasedReporter returns a Reporter which sends report based on time. func NewTimeBasedReporter(name string, timeRange timestamp.TimeRange, clock timestamp.Clock, scheduler *timestamp.Scheduler) Reporter { if timeRange.End.Before(clock.Now()) { return DummyReporter } t := &timeBasedReporter{ TimeRange: timeRange, scheduler: scheduler, clock: clock, name: name, count: &atomic.Uint32{}, } return t } func (tr *timeBasedReporter) Report() (Channel, error) { if tr.scheduler.Closed() { return nil, errReporterClosed } now := tr.clock.Now() if now.After(tr.End) { return nil, errReporterClosed } ch := make(Channel, 1) interval := tr.Duration() >> 4 if interval < 100*time.Millisecond { interval = 100 * time.Millisecond } ms := interval / time.Millisecond if err := tr.scheduler.Register( fmt.Sprintf("%s-%d", tr.name, tr.count.Add(1)), cron.Descriptor, fmt.Sprintf("@every %dms", ms), func(now time.Time, l *logger.Logger) bool { status := Status{ Capacity: int(tr.End.UnixNano() - tr.Start.UnixNano()), Volume: int(now.UnixNano() - tr.Start.UnixNano()), } if e := l.Debug(); e.Enabled() { e.Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("reporting a status") } select { case ch <- status: default: // TODO: this's too complicated, we should not use the channel anymore. if status.Volume >= status.Capacity { l.Warn().Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("the end status must be reported") ch <- status } else { l.Warn().Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("ignore a status") } } l.Info().Int("volume", status.Volume).Int("capacity", status.Capacity).Int("progress%", status.Volume*100/status.Capacity).Msg("reported a status") if status.Volume < status.Capacity { return true } close(ch) return false }); err != nil { close(ch) if errors.Is(err, timestamp.ErrSchedulerClosed) { return nil, errReporterClosed } return nil, err } return ch, nil }