refresh/fetcher.go (92 lines of code) (raw):

package refresh import ( "context" "time" ) const ( DefaultMinInterval = 4 * time.Second DefaultMaxInterval = 1024 * time.Second ) // Fetcher fetches data at regular intervals. The interval will vary within the range of minInterval and // maxInterval. When no diff is observed after a fetch, the interval doubles (subject to the maximum interval). // When a diff is observed, the interval resets to the minimum. The interval can be made unchanging by setting // minInterval and maxInterval to the same desired value. type Fetcher[T equaler[T]] struct { fetchFunc func(context.Context) (T, error) cache T minInterval time.Duration maxInterval time.Duration currentInterval time.Duration ticker TickProvider consumeFunc func(T) error logger Logger } // NewFetcher creates a new Fetcher. If minInterval is 0, it will default to 4 seconds. func NewFetcher[T equaler[T]]( fetchFunc func(context.Context) (T, error), minInterval time.Duration, maxInterval time.Duration, consumeFunc func(T) error, logger Logger, ) *Fetcher[T] { if minInterval == 0 { minInterval = DefaultMinInterval } if maxInterval == 0 { maxInterval = DefaultMaxInterval } maxInterval = max(minInterval, maxInterval) return &Fetcher[T]{ fetchFunc: fetchFunc, minInterval: minInterval, maxInterval: maxInterval, currentInterval: minInterval, consumeFunc: consumeFunc, logger: logger, } } func (f *Fetcher[T]) Start(ctx context.Context) { go func() { // do an initial fetch res, err := f.fetchFunc(ctx) if err != nil { f.logger.Printf("Error invoking fetch: %v", err) } f.cache = res if f.consumeFunc != nil { if err := f.consumeFunc(res); err != nil { f.logger.Errorf("Error consuming data: %v", err) } } if f.ticker == nil { f.ticker = NewTimedTickProvider(f.currentInterval) } defer f.ticker.Stop() for { select { case <-ctx.Done(): f.logger.Printf("Fetcher stopped") return case <-f.ticker.C(): result, err := f.fetchFunc(ctx) if err != nil { f.logger.Errorf("Error fetching data: %v", err) } else { if result.Equal(f.cache) { f.updateFetchIntervalForNoObservedDiff() f.logger.Printf("No diff observed in fetch, not invoking the consumer") } else { f.cache = result f.updateFetchIntervalForObservedDiff() if f.consumeFunc != nil { if err := f.consumeFunc(result); err != nil { f.logger.Errorf("Error consuming data: %v", err) } } } } f.ticker.Reset(f.currentInterval) } } }() } func (f *Fetcher[T]) updateFetchIntervalForNoObservedDiff() { f.currentInterval = min(f.currentInterval*2, f.maxInterval) // nolint:gomnd // doubling logic } func (f *Fetcher[T]) updateFetchIntervalForObservedDiff() { f.currentInterval = f.minInterval }