func New()

in aggregators/aggregator.go [66:116]


func New(opts ...Option) (*Aggregator, error) {
	cfg, err := newConfig(opts...)
	if err != nil {
		return nil, fmt.Errorf("failed to create aggregation config: %w", err)
	}

	pebbleOpts := &pebble.Options{
		Merger: &pebble.Merger{
			Name: "combined_metrics_merger",
			Merge: func(_, value []byte) (pebble.ValueMerger, error) {
				merger := combinedMetricsMerger{
					limits:      cfg.Limits,
					constraints: newConstraints(cfg.Limits),
				}
				var pb aggregationpb.CombinedMetrics
				if err := pb.UnmarshalVT(value); err != nil {
					return nil, fmt.Errorf("failed to unmarshal metrics: %w", err)
				}
				merger.merge(&pb)
				return &merger, nil
			},
		},
	}
	writeOptions := pebble.Sync
	if cfg.InMemory {
		pebbleOpts.FS = vfs.NewMem()
		pebbleOpts.DisableWAL = true
		writeOptions = pebble.NoSync
	}
	pb, err := pebble.Open(cfg.DataDir, pebbleOpts)
	if err != nil {
		return nil, fmt.Errorf("failed to create pebble db: %w", err)
	}

	metrics, err := telemetry.NewMetrics(
		func() *pebble.Metrics { return pb.Metrics() },
		telemetry.WithMeter(cfg.Meter),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create metrics: %w", err)
	}

	return &Aggregator{
		db:             pb,
		writeOptions:   writeOptions,
		cfg:            cfg,
		processingTime: time.Now().Truncate(cfg.AggregationIntervals[0]),
		closed:         make(chan struct{}),
		metrics:        metrics,
	}, nil
}