plugins/outputs/cloudwatchlogs/internal/pusher/pool.go (92 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package pusher
import (
"sync"
"sync/atomic"
"time"
)
type WorkerPool interface {
Submit(task func())
Stop()
}
type workerPool struct {
tasks chan func()
workerCount atomic.Int32
wg sync.WaitGroup
stopCh chan struct{}
stopLock sync.RWMutex
}
// NewWorkerPool creates a pool of workers of the specified size.
func NewWorkerPool(size int) WorkerPool {
p := &workerPool{
tasks: make(chan func(), size*2),
stopCh: make(chan struct{}),
}
for i := 0; i < size; i++ {
p.addWorker()
}
return p
}
// addWorker creates and starts a new worker goroutine.
func (p *workerPool) addWorker() {
p.wg.Add(1)
p.workerCount.Add(1)
go p.worker()
}
// worker receives tasks from the channel and executes them.
func (p *workerPool) worker() {
defer func() {
p.workerCount.Add(-1)
p.wg.Done()
}()
for task := range p.tasks {
task()
}
}
// Submit adds a task to the pool. Blocks until a worker is available to receive the task or the pool is stopped.
func (p *workerPool) Submit(task func()) {
p.stopLock.RLock()
defer p.stopLock.RUnlock()
select {
case <-p.stopCh:
return
default:
select {
case p.tasks <- task:
case <-p.stopCh:
return
}
}
}
// WorkerCount keeps track of the available workers in the pool.
func (p *workerPool) WorkerCount() int32 {
return p.workerCount.Load()
}
// Stop closes the channels and waits for the workers to stop.
func (p *workerPool) Stop() {
p.stopLock.Lock()
defer p.stopLock.Unlock()
select {
case <-p.stopCh:
return
default:
close(p.stopCh)
close(p.tasks)
p.wg.Wait()
}
}
// senderPool wraps a Sender with a WorkerPool for concurrent sending.
type senderPool struct {
workerPool WorkerPool
sender Sender
}
var _ Sender = (*senderPool)(nil)
func newSenderPool(workerPool WorkerPool, sender Sender) Sender {
return &senderPool{
workerPool: workerPool,
sender: sender,
}
}
// Send submits a send task to the worker pool.
func (s *senderPool) Send(batch *logEventBatch) {
s.workerPool.Submit(func() {
s.sender.Send(batch)
})
}
// SetRetryDuration sets the retry duration on the wrapped Sender.
func (s *senderPool) SetRetryDuration(duration time.Duration) {
s.sender.SetRetryDuration(duration)
}
// RetryDuration returns the retry duration of the wrapped Sender.
func (s *senderPool) RetryDuration() time.Duration {
return s.sender.RetryDuration()
}