router/internal/graphqlmetrics/exporter.go (301 lines of code) (raw):
package graphqlmetrics
import (
"context"
"errors"
"fmt"
"time"
"connectrpc.com/connect"
"github.com/cloudflare/backoff"
graphqlmetrics "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1"
"github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1/graphqlmetricsv1connect"
"go.uber.org/atomic"
"go.uber.org/zap"
)
type Exporter struct {
settings *ExporterSettings
logger *zap.Logger
client graphqlmetricsv1connect.GraphQLMetricsServiceClient
apiToken string
shutdownSignal chan struct{}
acceptTrafficSema chan struct{}
queue chan *graphqlmetrics.SchemaUsageInfo
inflightBatches *atomic.Int64
// exportRequestContext is used to cancel all requests that started before the shutdown
exportRequestContext context.Context
// cancelAllExportRequests will be called when we return from the Shutdown func
// this means that we cancel all requests
cancelAllExportRequests context.CancelFunc
}
type RetryOptions struct {
Enabled bool
MaxDuration time.Duration
Interval time.Duration
MaxRetry int
}
const (
defaultExportTimeout = time.Duration(10) * time.Second
defaultExportRetryMaxDuration = time.Duration(10) * time.Second
defaultExportRetryInterval = time.Duration(5) * time.Second
defaultExportMaxRetryAttempts = 5
defaultMaxBatchItems = 1024
defaultMaxQueueSize = 1024 * 10
defaultBatchInterval = time.Duration(10) * time.Second
)
type ExporterSettings struct {
// BatchSize is the maximum number of items to be sent in a single batch.
BatchSize int
// QueueSize is the maximum number of batches allowed in queue at a given time.
QueueSize int
// Interval is the interval at which the queue is flushed.
Interval time.Duration
// Retry is the retry options for the exporter.
RetryOptions RetryOptions
// ExportTimeout is the timeout for the export request.
ExportTimeout time.Duration
}
func NewDefaultExporterSettings() *ExporterSettings {
return &ExporterSettings{
BatchSize: defaultMaxBatchItems,
QueueSize: defaultMaxQueueSize,
Interval: defaultBatchInterval,
ExportTimeout: defaultExportTimeout,
RetryOptions: RetryOptions{
Enabled: true,
MaxRetry: defaultExportMaxRetryAttempts,
MaxDuration: defaultExportRetryMaxDuration,
Interval: defaultExportRetryInterval,
},
}
}
// NewExporter creates a new GraphQL metrics exporter. The collectorEndpoint is the endpoint to which the metrics
// are sent. The apiToken is the token used to authenticate with the collector. The collector supports Brotli compression
// and retries on failure. Underling queue implementation sends batches of metrics at the specified interval and batch size.
func NewExporter(logger *zap.Logger, client graphqlmetricsv1connect.GraphQLMetricsServiceClient, apiToken string, settings *ExporterSettings) (*Exporter, error) {
ctx, cancel := context.WithCancel(context.Background())
e := &Exporter{
logger: logger.With(zap.String("component", "graphqlmetrics_exporter")),
settings: settings,
client: client,
apiToken: apiToken,
queue: make(chan *graphqlmetrics.SchemaUsageInfo, settings.QueueSize),
shutdownSignal: make(chan struct{}),
acceptTrafficSema: make(chan struct{}),
inflightBatches: atomic.NewInt64(0),
exportRequestContext: ctx,
cancelAllExportRequests: cancel,
}
if err := e.validate(); err != nil {
return nil, err
}
go e.start()
return e, nil
}
func (e *Exporter) validate() error {
if e.settings.BatchSize <= 0 {
return errors.New("batch size must be positive")
}
if e.settings.QueueSize <= 0 {
return errors.New("queue size must be positive")
}
if e.settings.Interval <= 0 {
return errors.New("interval must be positive")
}
if e.settings.ExportTimeout <= 0 {
return errors.New("export timeout must be positive")
}
if e.settings.RetryOptions.MaxDuration <= 0 {
return errors.New("retry max duration must be positive")
}
if e.settings.RetryOptions.Interval <= 0 {
return errors.New("retry interval must be positive")
}
if e.settings.RetryOptions.MaxRetry <= 0 {
return errors.New("retry max retry must be positive")
}
return nil
}
func (e *Exporter) acceptTraffic() bool {
// while the channel is not closed, the select will always return the default case
// once it's closed, the select will always return _,false (closed channel) from the channel
select {
case <-e.acceptTrafficSema:
return false
default:
return true
}
}
func (e *Exporter) RecordUsage(usageInfo *graphqlmetrics.SchemaUsageInfo, synchronous bool) (ok bool) {
if synchronous {
_ = e.sendItems([]*graphqlmetrics.SchemaUsageInfo{usageInfo})
return true
}
if !e.acceptTraffic() {
return false
}
select {
case e.queue <- usageInfo:
return true
default:
e.logger.Warn("RecordAsync: Queue is full, dropping item")
return false
}
}
func (e *Exporter) sendItems(items []*graphqlmetrics.SchemaUsageInfo) error {
e.logger.Debug("sending batch", zap.Int("size", len(items)))
ctx := e.exportRequestContext
if e.settings.ExportTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(e.exportRequestContext, e.settings.ExportTimeout)
defer cancel()
}
req := connect.NewRequest(&graphqlmetrics.PublishGraphQLRequestMetricsRequest{
SchemaUsage: items,
})
req.Header().Set("Authorization", fmt.Sprintf("Bearer %s", e.apiToken))
_, err := e.client.PublishGraphQLMetrics(ctx, req)
if err != nil {
e.logger.Debug("Failed to export batch", zap.Error(err), zap.Int("batch_size", len(items)))
return err
}
e.logger.Debug("Successfully exported batch", zap.Int("batch_size", len(items)))
return nil
}
func (e *Exporter) sendAggregation(ctx context.Context, request *graphqlmetrics.PublishAggregatedGraphQLRequestMetricsRequest) error {
e.logger.Debug("sendAggregation", zap.Int("size", len(request.Aggregation)))
if e.settings.ExportTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, e.settings.ExportTimeout)
defer cancel()
}
req := connect.NewRequest(request)
req.Header().Set("Authorization", fmt.Sprintf("Bearer %s", e.apiToken))
_, err := e.client.PublishAggregatedGraphQLMetrics(ctx, req)
if err != nil {
e.logger.Debug("sendAggregation failed", zap.Error(err), zap.Int("batch_size", len(request.Aggregation)))
return err
}
e.logger.Debug("sendAggregation success", zap.Int("batch_size", len(request.Aggregation)))
return nil
}
func (e *Exporter) prepareAndSendBatch(batch []*graphqlmetrics.SchemaUsageInfo) {
e.logger.Debug("Exporter.prepareAndSendBatch", zap.Int("batch_size", len(batch)))
e.inflightBatches.Inc()
go func() {
defer e.inflightBatches.Dec()
e.aggregateAndSendBatch(batch)
}()
}
// export sends the batch to the configured endpoint.
func (e *Exporter) aggregateAndSendBatch(batch []*graphqlmetrics.SchemaUsageInfo) {
b := backoff.New(e.settings.RetryOptions.MaxDuration, e.settings.RetryOptions.Interval)
defer b.Reset()
request := AggregateSchemaUsageInfoBatch(batch)
err := e.sendAggregation(e.exportRequestContext, request)
if err == nil {
return
}
var connectErr *connect.Error
if errors.As(err, &connectErr) && connectErr.Code() == connect.CodeUnauthenticated {
e.logger.Error("Failed to export batch due to unauthenticated error, not retrying",
zap.Error(err),
zap.Int("batch_size", len(request.Aggregation)),
)
return
}
if !e.settings.RetryOptions.Enabled {
e.logger.Error("Failed to export batch",
zap.Error(err),
zap.Int("batch_size", len(request.Aggregation)),
)
return
}
var retry int
var lastErr error
for retry <= e.settings.RetryOptions.MaxRetry {
retry++
// Wait for the specified backoff period
sleepDuration := b.Duration()
e.logger.Debug(fmt.Sprintf("Retrying export in %s ...", sleepDuration.String()),
zap.Int("batch_size", len(request.Aggregation)),
zap.Int("retry", retry),
zap.Duration("sleep", sleepDuration),
)
// Wait for the specified backoff period
time.Sleep(sleepDuration)
err = e.sendAggregation(e.exportRequestContext, request)
if err == nil {
return
}
if errors.As(err, &connectErr) && connectErr.Code() == connect.CodeUnauthenticated {
e.logger.Error("Failed to export batch due to unauthenticated error, not retrying",
zap.Error(err),
zap.Int("batch_size", len(request.Aggregation)),
)
return
}
lastErr = err
}
e.logger.Error("Failed to export batch after retries",
zap.Error(lastErr),
zap.Int("batch_size", len(request.Aggregation)),
zap.Int("retries", retry),
)
}
// start starts the exporter and blocks until the exporter is shutdown.
func (e *Exporter) start() {
e.logger.Debug("Starting exporter")
ticker := time.NewTicker(e.settings.Interval)
defer func() {
ticker.Stop()
e.logger.Debug("Exporter stopped")
}()
var buffer []*graphqlmetrics.SchemaUsageInfo
for {
if buffer == nil {
buffer = make([]*graphqlmetrics.SchemaUsageInfo, 0, e.settings.BatchSize)
}
select {
case <-ticker.C:
e.logger.Debug("Exporter.start: tick")
if len(buffer) > 0 {
e.prepareAndSendBatch(buffer)
buffer = nil
}
case item := <-e.queue:
e.logger.Debug("Exporter.start: item")
buffer = append(buffer, item)
if len(buffer) == e.settings.BatchSize {
e.prepareAndSendBatch(buffer)
buffer = nil
}
case <-e.shutdownSignal:
e.logger.Debug("Exporter.start: shutdown")
e.drainQueue(buffer)
return
}
}
}
func (e *Exporter) drainQueue(buffer []*graphqlmetrics.SchemaUsageInfo) {
e.logger.Debug("Exporter.closeAndDrainQueue")
drainedItems := 0
for {
select {
case item := <-e.queue:
drainedItems++
buffer = append(buffer, item)
if len(buffer) == e.settings.BatchSize {
e.prepareAndSendBatch(buffer)
buffer = make([]*graphqlmetrics.SchemaUsageInfo, 0, e.settings.BatchSize)
}
default:
if len(buffer) > 0 {
e.prepareAndSendBatch(buffer)
}
e.logger.Debug("Exporter.closeAndDrainQueue: done", zap.Int("drained_items", drainedItems))
return
}
}
}
// Shutdown the exporter but waits until all export jobs has been finished or timeout.
// If the context is canceled, the exporter will be shutdown immediately.
func (e *Exporter) Shutdown(ctx context.Context) error {
ticker := time.NewTicker(time.Millisecond * 100)
defer func() {
ticker.Stop()
// cancel all requests
e.cancelAllExportRequests()
e.logger.Debug("Exporter.Shutdown: done")
}()
// first close the acceptTrafficSema to stop accepting new items
close(e.acceptTrafficSema)
// then trigger the shutdown signal for the exporter goroutine to stop
// it will then drain the queue and send the remaining items
close(e.shutdownSignal)
// we're polling the inflightBatches to wait for all inflight batches to finish or timeout
// we're not using a wait group here because you can't wait for a wait group with a timeout
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if e.inflightBatches.Load() == 0 {
return nil
}
}
}
}