utils/dedup/limiter.go (95 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dedup
import (
"sync"
"time"
"github.com/andres-erbsen/clock"
)
// TaskGCInterval is the interval in which garbage collection of old tasks runs.
const TaskGCInterval = time.Minute
// TaskRunner runs against some input and produces some output w/ a ttl.
type TaskRunner interface {
Run(input interface{}) (output interface{}, ttl time.Duration)
}
type task struct {
input interface{}
cond *sync.Cond
running bool
output interface{}
expiresAt time.Time
}
func newTask(input interface{}) *task {
return &task{
input: input,
cond: sync.NewCond(new(sync.Mutex)),
}
}
func (t *task) expired(now time.Time) bool {
return now.After(t.expiresAt)
}
// Limiter deduplicates the running of a common task within a given limit. Tasks
// are deduplicated based on input equality.
type Limiter struct {
sync.RWMutex
clk clock.Clock
runner TaskRunner
tasks map[interface{}]*task
gc *IntervalTrap
}
// NewLimiter creates a new Limiter for tasks. The limit is determined per task
// via the TaskRunner.
func NewLimiter(clk clock.Clock, runner TaskRunner) *Limiter {
l := &Limiter{
clk: clk,
runner: runner,
tasks: make(map[interface{}]*task),
}
l.gc = NewIntervalTrap(TaskGCInterval, clk, &limiterTaskGC{l})
return l
}
// Run runs a task with input.
func (l *Limiter) Run(input interface{}) interface{} {
l.gc.Trap()
l.RLock()
t, ok := l.tasks[input]
l.RUnlock()
if !ok {
// Slow path, must initialize task struct under global write lock.
l.Lock()
t, ok = l.tasks[input]
if !ok {
t = newTask(input)
l.tasks[input] = t
}
l.Unlock()
}
return l.getOutput(t)
}
func (l *Limiter) getOutput(t *task) interface{} {
t.cond.L.Lock()
if !t.expired(l.clk.Now()) {
defer t.cond.L.Unlock()
return t.output
}
if t.running {
t.cond.Wait()
defer t.cond.L.Unlock()
return t.output
}
t.running = true
t.cond.L.Unlock()
output, ttl := l.runner.Run(t.input)
t.cond.L.Lock()
t.output = output
t.expiresAt = l.clk.Now().Add(ttl)
t.running = false
t.cond.L.Unlock()
t.cond.Broadcast()
return output
}
type limiterTaskGC struct {
limiter *Limiter
}
func (gc *limiterTaskGC) Run() {
gc.limiter.Lock()
defer gc.limiter.Unlock()
for input, t := range gc.limiter.tasks {
t.cond.L.Lock()
expired := t.expired(gc.limiter.clk.Now()) && !t.running
t.cond.L.Unlock()
if expired {
delete(gc.limiter.tasks, input)
}
}
}