plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go (53 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package pusher
import (
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/aws/amazon-cloudwatch-agent/logs"
)
// Pusher connects the Queue to the Sender.
type Pusher struct {
Target
Queue
Service cloudWatchLogsService
TargetManager TargetManager
EntityProvider logs.LogEntityProvider
Sender Sender
}
// NewPusher creates a new Pusher instance with a new Queue and Sender. Calls PutRetentionPolicy using the
// TargetManager.
func NewPusher(
logger telegraf.Logger,
target Target,
service cloudWatchLogsService,
targetManager TargetManager,
entityProvider logs.LogEntityProvider,
workerPool WorkerPool,
flushTimeout time.Duration,
retryDuration time.Duration,
stop <-chan struct{},
wg *sync.WaitGroup,
) *Pusher {
s := createSender(logger, service, targetManager, workerPool, retryDuration, stop)
q := newQueue(logger, target, flushTimeout, entityProvider, s, stop, wg)
targetManager.PutRetentionPolicy(target)
return &Pusher{
Target: target,
Queue: q,
Service: service,
TargetManager: targetManager,
EntityProvider: entityProvider,
Sender: s,
}
}
// createSender initializes a Sender. Wraps it in a senderPool if a WorkerPool is provided.
func createSender(
logger telegraf.Logger,
service cloudWatchLogsService,
targetManager TargetManager,
workerPool WorkerPool,
retryDuration time.Duration,
stop <-chan struct{},
) Sender {
s := newSender(logger, service, targetManager, retryDuration, stop)
if workerPool == nil {
return s
}
return newSenderPool(workerPool, s)
}