func newProcessor()

in processor/lsmintervalprocessor/processor.go [100:148]


func newProcessor(cfg *config.Config, ivlDefs []intervalDef, log *zap.Logger, next consumer.Metrics) (*Processor, error) {
	dbOpts := &pebble.Options{
		Merger: &pebble.Merger{
			Name: "pmetrics_merger",
			Merge: func(key, value []byte) (pebble.ValueMerger, error) {
				v := merger.NewValue(
					cfg.ResourceLimit,
					cfg.ScopeLimit,
					cfg.MetricLimit,
					cfg.DatapointLimit,
					cfg.ExponentialHistogramMaxBuckets,
				)
				if err := v.Unmarshal(value); err != nil {
					return nil, fmt.Errorf("failed to unmarshal value from db: %w", err)
				}
				return merger.New(v), nil
			},
		},
		MemTableSize:                pebbleMemTableSize,
		MemTableStopWritesThreshold: pebbleMemTableStopWritesThreshold,
	}
	writeOpts := pebble.Sync
	dataDir := cfg.Directory
	if dataDir == "" {
		log.Info("no directory specified, switching to in-memory mode")
		dbOpts.FS = vfs.NewMem()
		dbOpts.DisableWAL = true
		writeOpts = pebble.NoSync
		dataDir = "/data" // will be created in the in-mem file-system
	}

	sortedMetadataKeys := append([]string{}, cfg.MetadataKeys...)
	sort.Strings(sortedMetadataKeys)

	ctx, cancel := context.WithCancel(context.Background())
	return &Processor{
		cfg:                cfg,
		sortedMetadataKeys: sortedMetadataKeys,
		dataDir:            dataDir,
		dbOpts:             dbOpts,
		wOpts:              writeOpts,
		intervals:          ivlDefs,
		next:               next,
		processingTime:     time.Now().UTC().Truncate(ivlDefs[0].Duration),
		ctx:                ctx,
		cancel:             cancel,
		logger:             log,
	}, nil
}