aggregators/config.go (177 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package aggregators
import (
"context"
"errors"
"fmt"
"sort"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/elastic/apm-aggregation/aggregationpb"
)
const instrumentationName = "aggregators"
// Processor defines handling of the aggregated metrics post harvest.
// CombinedMetrics passed to the processor is pooled and it is released
// back to the pool after processor has returned. If the processor mutates
// the CombinedMetrics such that it can no longer access the pooled objects,
// then the Processor should release the objects back to the pool.
type Processor func(
ctx context.Context,
cmk CombinedMetricsKey,
cm *aggregationpb.CombinedMetrics,
aggregationIvl time.Duration,
) error
// config contains the required config for running the aggregator.
type config struct {
DataDir string
Limits Limits
Processor Processor
Partitions uint16
AggregationIntervals []time.Duration
HarvestDelay time.Duration
Lookback time.Duration
CombinedMetricsIDToKVs func([16]byte) []attribute.KeyValue
InMemory bool
Meter metric.Meter
Tracer trace.Tracer
Logger *zap.Logger
OverflowLogging bool
}
// Option allows configuring aggregator based on functional options.
type Option func(config) config
// NewConfig creates a new aggregator config based on the passed options.
func newConfig(opts ...Option) (config, error) {
cfg := defaultCfg()
for _, opt := range opts {
cfg = opt(cfg)
}
return cfg, validateCfg(cfg)
}
// WithDataDir configures the data directory to be used by the database.
func WithDataDir(dataDir string) Option {
return func(c config) config {
c.DataDir = dataDir
return c
}
}
// WithLimits configures the limits to be used by the aggregator.
func WithLimits(limits Limits) Option {
return func(c config) config {
c.Limits = limits
return c
}
}
// WithProcessor configures the processor for handling of the aggregated
// metrics post harvest. Processor is called for each decoded combined
// metrics after they are harvested. CombinedMetrics passed to the
// processor is pooled and it is released back to the pool after processor
// has returned. If the processor mutates the CombinedMetrics such that it
// can no longer access the pooled objects, then the Processor should
// release the objects back to the pool.
func WithProcessor(processor Processor) Option {
return func(c config) config {
c.Processor = processor
return c
}
}
// WithPartitions configures the number of partitions for combined metrics
// written to pebble. Defaults to 1.
//
// Partition IDs are encoded in a way that all the partitions of a specific
// combined metric are listed before any other if compared using the bytes
// comparer.
func WithPartitions(n uint16) Option {
return func(c config) config {
c.Partitions = n
return c
}
}
// WithAggregationIntervals defines the intervals that aggregator will
// aggregate for.
func WithAggregationIntervals(aggIvls []time.Duration) Option {
return func(c config) config {
c.AggregationIntervals = aggIvls
return c
}
}
// WithHarvestDelay delays the harvest by the configured duration.
// This means that harvest for a specific processing time would be
// performed with the given delay.
//
// Without delay, a normal harvest schedule will harvest metrics
// aggregated for processing time, say `t0`, at time `t1`, where
// `t1 = t0 + aggregation_interval`. With delay of, say `d`, the
// harvester will harvest the metrics for `t0` at `t1 + d`. In
// addition to harvest the duration for which the metrics are
// aggregated by the AggregateBatch API will also be affected.
//
// The main purpose of the delay is to handle the latency of
// receiving the l1 aggregated metrics in l2 aggregation. Thus
// the value must be configured for the l2 aggregator and is
// not required for l1 aggregator. If used as such then the
// harvest delay has no effects on the duration for which the
// metrics are aggregated. This is because AggregateBatch API is
// not used by the l2 aggregator.
func WithHarvestDelay(delay time.Duration) Option {
return func(c config) config {
c.HarvestDelay = delay
return c
}
}
// WithLookback configures the maximum duration that the
// aggregator will use to query the database during harvest time
// in addition to the original period derived from aggregation
// interval i.e. the harvest interval for each aggregation interval
// will be defined as [end-Lookback-AggregationIvl, end).
//
// The main purpose of Lookback is to protect against data loss for
// multi level deployments of aggregators where AggregateCombinedMetrics
// is used to aggregate partial aggregates. In these cases, the
// Lookback configuration can protect against data loss due to
// delayed partial aggregates. Note that these delayed partial
// aggregates will only be aggregated with other delayed partial
// aggregates and thus we can have multiple aggregated metrics for
// the same CombinedMetricsKey{Interval, ProcessingTime, ID}.
func WithLookback(lookback time.Duration) Option {
return func(c config) config {
c.Lookback = lookback
return c
}
}
// WithMeter defines a custom meter which will be used for collecting
// telemetry. Defaults to the meter provided by global provider.
func WithMeter(meter metric.Meter) Option {
return func(c config) config {
c.Meter = meter
return c
}
}
// WithTracer defines a custom tracer which will be used for collecting
// traces. Defaults to the tracer provided by global provider.
func WithTracer(tracer trace.Tracer) Option {
return func(c config) config {
c.Tracer = tracer
return c
}
}
// WithCombinedMetricsIDToKVs defines a function that converts a combined
// metrics ID to zero or more attribute.KeyValue for telemetry.
func WithCombinedMetricsIDToKVs(f func([16]byte) []attribute.KeyValue) Option {
return func(c config) config {
c.CombinedMetricsIDToKVs = f
return c
}
}
// WithLogger defines a custom logger to be used by aggregator.
func WithLogger(logger *zap.Logger) Option {
return func(c config) config {
c.Logger = logger
return c
}
}
// WithOverflowLogging enables warning logs at harvest time, when overflows have occurred.
//
// Logging of overflows is disabled by default, as most callers are expected to rely on
// metrics to surface cardinality issues. Support for logging exists for historical reasons.
func WithOverflowLogging(enabled bool) Option {
return func(c config) config {
c.OverflowLogging = enabled
return c
}
}
// WithInMemory defines whether aggregator uses in-memory file system.
func WithInMemory(enabled bool) Option {
return func(c config) config {
c.InMemory = enabled
return c
}
}
func defaultCfg() config {
return config{
DataDir: "/tmp",
Processor: stdoutProcessor,
Partitions: 1,
AggregationIntervals: []time.Duration{time.Minute},
Meter: otel.Meter(instrumentationName),
Tracer: otel.Tracer(instrumentationName),
CombinedMetricsIDToKVs: func(_ [16]byte) []attribute.KeyValue { return nil },
Logger: zap.Must(zap.NewDevelopment()),
}
}
func validateCfg(cfg config) error {
if cfg.DataDir == "" {
return errors.New("data directory is required")
}
if cfg.Processor == nil {
return errors.New("processor is required")
}
if cfg.Partitions == 0 {
return errors.New("partitions must be greater than zero")
}
if len(cfg.AggregationIntervals) == 0 {
return errors.New("at least one aggregation interval is required")
}
if !sort.SliceIsSorted(cfg.AggregationIntervals, func(i, j int) bool {
return cfg.AggregationIntervals[i] < cfg.AggregationIntervals[j]
}) {
return errors.New("aggregation intervals must be in ascending order")
}
lowest := cfg.AggregationIntervals[0]
highest := cfg.AggregationIntervals[len(cfg.AggregationIntervals)-1]
for i := 1; i < len(cfg.AggregationIntervals); i++ {
ivl := cfg.AggregationIntervals[i]
if ivl%lowest != 0 {
return errors.New("aggregation intervals must be a factor of lowest interval")
}
}
// For encoding/decoding the processing time for combined metrics we only
// consider seconds granularity making 1 sec the lowest possible
// aggregation interval. We also encode interval as 2 unsigned bytes making
// 65535 (~18 hours) the highest possible aggregation interval.
if lowest < time.Second {
return errors.New("aggregation interval less than one second is not supported")
}
if highest > 18*time.Hour {
return errors.New("aggregation interval greater than 18 hours is not supported")
}
return nil
}
func stdoutProcessor(
ctx context.Context,
cmk CombinedMetricsKey,
_ *aggregationpb.CombinedMetrics,
_ time.Duration,
) error {
fmt.Printf("Recevied combined metrics with key: %+v\n", cmk)
return nil
}