in processor/lsmintervalprocessor/processor.go [100:148]
func newProcessor(cfg *config.Config, ivlDefs []intervalDef, log *zap.Logger, next consumer.Metrics) (*Processor, error) {
dbOpts := &pebble.Options{
Merger: &pebble.Merger{
Name: "pmetrics_merger",
Merge: func(key, value []byte) (pebble.ValueMerger, error) {
v := merger.NewValue(
cfg.ResourceLimit,
cfg.ScopeLimit,
cfg.MetricLimit,
cfg.DatapointLimit,
cfg.ExponentialHistogramMaxBuckets,
)
if err := v.Unmarshal(value); err != nil {
return nil, fmt.Errorf("failed to unmarshal value from db: %w", err)
}
return merger.New(v), nil
},
},
MemTableSize: pebbleMemTableSize,
MemTableStopWritesThreshold: pebbleMemTableStopWritesThreshold,
}
writeOpts := pebble.Sync
dataDir := cfg.Directory
if dataDir == "" {
log.Info("no directory specified, switching to in-memory mode")
dbOpts.FS = vfs.NewMem()
dbOpts.DisableWAL = true
writeOpts = pebble.NoSync
dataDir = "/data" // will be created in the in-mem file-system
}
sortedMetadataKeys := append([]string{}, cfg.MetadataKeys...)
sort.Strings(sortedMetadataKeys)
ctx, cancel := context.WithCancel(context.Background())
return &Processor{
cfg: cfg,
sortedMetadataKeys: sortedMetadataKeys,
dataDir: dataDir,
dbOpts: dbOpts,
wOpts: writeOpts,
intervals: ivlDefs,
next: next,
processingTime: time.Now().UTC().Truncate(ivlDefs[0].Duration),
ctx: ctx,
cancel: cancel,
logger: log,
}, nil
}