in aggregators/aggregator.go [66:116]
func New(opts ...Option) (*Aggregator, error) {
cfg, err := newConfig(opts...)
if err != nil {
return nil, fmt.Errorf("failed to create aggregation config: %w", err)
}
pebbleOpts := &pebble.Options{
Merger: &pebble.Merger{
Name: "combined_metrics_merger",
Merge: func(_, value []byte) (pebble.ValueMerger, error) {
merger := combinedMetricsMerger{
limits: cfg.Limits,
constraints: newConstraints(cfg.Limits),
}
var pb aggregationpb.CombinedMetrics
if err := pb.UnmarshalVT(value); err != nil {
return nil, fmt.Errorf("failed to unmarshal metrics: %w", err)
}
merger.merge(&pb)
return &merger, nil
},
},
}
writeOptions := pebble.Sync
if cfg.InMemory {
pebbleOpts.FS = vfs.NewMem()
pebbleOpts.DisableWAL = true
writeOptions = pebble.NoSync
}
pb, err := pebble.Open(cfg.DataDir, pebbleOpts)
if err != nil {
return nil, fmt.Errorf("failed to create pebble db: %w", err)
}
metrics, err := telemetry.NewMetrics(
func() *pebble.Metrics { return pb.Metrics() },
telemetry.WithMeter(cfg.Meter),
)
if err != nil {
return nil, fmt.Errorf("failed to create metrics: %w", err)
}
return &Aggregator{
db: pb,
writeOptions: writeOptions,
cfg: cfg,
processingTime: time.Now().Truncate(cfg.AggregationIntervals[0]),
closed: make(chan struct{}),
metrics: metrics,
}, nil
}