subscriber/common/tools/batcher.go (75 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"sync"
"time"
)
// Batcher batches a sequence of tasks and send them to workers for execution asynchronously.
// A batch of size 2^x is flushed when either maxDelay has elapsed since the oldest task was added,
// or a max number of tasks have been queued.
type Batcher struct {
workerWG sync.WaitGroup
taskQueue chan batcherTask
batchQueue chan []interface{}
maxBatchSize int
maxDelay time.Duration
now func() time.Time
}
type batcherTask struct {
task interface{}
time time.Time
}
// NewBatcher creates and starts a batcher with no worker.
// User must call StartWorker before any task can be processed.
func NewBatcher(maxBatchSize int, maxDelay time.Duration, now func() time.Time) *Batcher {
batcher := &Batcher{
taskQueue: make(chan batcherTask),
batchQueue: make(chan []interface{}),
maxBatchSize: maxBatchSize,
maxDelay: maxDelay,
now: now,
}
go batcher.run()
return batcher
}
// StartWorker adds a worker to the batcher and runs it asynchronously.
// The worker func shall receive task batches from the channel, and notify the wait group when it quits.
func (b *Batcher) StartWorker(run func(chan []interface{}, *sync.WaitGroup)) {
b.workerWG.Add(1)
go run(b.batchQueue, &b.workerWG)
}
// Add adds a task to the batcher for asynchronous processing.
func (b *Batcher) Add(task interface{}, time time.Time) {
b.taskQueue <- batcherTask{task, time}
}
// Close signals all workers to quit and blocks until all queued tasks are processed.
func (b *Batcher) Close() {
close(b.taskQueue)
b.workerWG.Wait()
}
func createBatch(buffer []batcherTask) []interface{} {
// Largest power of 2 that fits within buffer size.
batchSize := 1
for batchSize<<1 <= len(buffer) {
batchSize <<= 1
}
batch := make([]interface{}, batchSize)
for i := range batch {
batch[i] = buffer[i].task
}
return batch
}
func (b *Batcher) run() {
var buffer []batcherTask
timeout := time.Tick(10 * time.Millisecond)
for {
select {
case task, ok := <-b.taskQueue:
if !ok {
// Flush all remaining tasks on quit.
for len(buffer) > 0 {
batch := createBatch(buffer)
b.batchQueue <- batch
buffer = buffer[len(batch):]
}
close(b.batchQueue)
return
}
buffer = append(buffer, task)
case <-timeout:
}
// Flush when either max batch size is reached, or max delay is reached.
for len(buffer) >= b.maxBatchSize || (len(buffer) > 0 && b.now().Sub(buffer[0].time) >= b.maxDelay) {
batch := createBatch(buffer)
b.batchQueue <- batch
buffer = buffer[len(batch):]
}
}
}