periodic/ratelimited.go (69 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package periodic
import (
"sync"
"sync/atomic"
"time"
)
// Doer limits an action to be executed at most once within a specified period.
// It is intended for managing events that occur frequently, but instead of an
// action being taken for every event, the action should be executed at most
// once within a given period of time.
//
// Doer takes a function to execute, doFn, which is called every time
// the specified period has elapsed with the number of events and the period.
type Doer struct {
count atomic.Uint64
period time.Duration
// doFn is called for executing the action every period if at least one
// event happened. It receives the count of events and the period.
doFn func(count uint64, d time.Duration)
lastDone time.Time
done chan struct{}
// nowFn is used to acquire the current time instead of time.Now so it can
// be mocked for tests.
nowFn func() time.Time
// newTickerFn is used to acquire a *time.Ticker instead of time.NewTicker
// so it can be mocked for tests.
newTickerFn func(duration time.Duration) *time.Ticker
started atomic.Bool
wg sync.WaitGroup
ticker *time.Ticker
}
// NewDoer returns a new Doer. It takes a doFn, which is
// called with the current count of events and the period.
func NewDoer(period time.Duration, doFn func(count uint64, d time.Duration)) *Doer {
return &Doer{
period: period,
doFn: doFn,
nowFn: time.Now,
newTickerFn: time.NewTicker,
}
}
func (r *Doer) Add() {
r.count.Add(1)
}
func (r *Doer) AddN(n uint64) {
r.count.Add(n)
}
func (r *Doer) Start() {
if r.started.Swap(true) {
return
}
r.done = make(chan struct{})
r.lastDone = r.nowFn()
r.ticker = r.newTickerFn(r.period)
r.wg.Add(1)
go func() {
defer r.wg.Done()
defer r.ticker.Stop()
for {
select {
case <-r.ticker.C:
r.do()
case <-r.done:
r.do()
return
}
}
}()
}
func (r *Doer) Stop() {
if !r.started.Swap(false) {
return
}
close(r.done)
r.wg.Wait()
}
func (r *Doer) do() {
count := r.count.Swap(0)
if count == 0 {
return
}
r.lastDone = r.nowFn()
r.doFn(count, r.period)
}