processor/lsmintervalprocessor/processor.go (464 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package lsmintervalprocessor // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor" import ( "context" "errors" "fmt" "sort" "sync" "time" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/vfs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" "go.uber.org/zap" "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger" ) var _ processor.Metrics = (*Processor)(nil) const ( // pebbleMemTableSize defines the max steady state size of a memtable. // There can be more than 1 memtable in memory at a time as it takes // time for old memtable to flush. The memtable size also defines // the size for large batches. A large batch is a batch which will // take atleast half of the memtable size. Note that the Batch#Len // is not the same as the memtable size that the batch will occupy // as data in batches are encoded differently. In general, the // memtable size of the batch will be higher than the length of the // batch data. // // On commit, data in the large batch maybe kept by pebble and thus // large batches will need to be reallocated. Note that large batch // classification uses the memtable size that a batch will occupy // rather than the length of data slice backing the batch. pebbleMemTableSize = 32 << 20 // 32MB // pebbleMemTableStopWritesThreshold is the hard limit on the maximum // number of memtables that could be queued before which writes are // stopped. This value should be at least 2 or writes will stop whenever // a MemTable is being flushed. pebbleMemTableStopWritesThreshold = 2 // dbCommitThresholdBytes is a soft limit and the batch is committed // to the DB as soon as it crosses this threshold. To make sure that // the commit threshold plays well with the max retained batch size // the threshold should be kept smaller than the sum of max retained // batch size and encoded size of aggregated data to be committed. // However, this requires https://github.com/cockroachdb/pebble/pull/3139. // So, for now we are only tweaking the available options. dbCommitThresholdBytes = 8 << 20 // 8MB ) type Processor struct { cfg *config.Config sortedMetadataKeys []string db *pebble.DB dataDir string dbOpts *pebble.Options wOpts *pebble.WriteOptions intervals []intervalDef next consumer.Metrics bufferPool sync.Pool mu sync.Mutex batch *pebble.Batch processingTime time.Time ctx context.Context cancel context.CancelFunc exportStopped chan struct{} logger *zap.Logger } func newProcessor(cfg *config.Config, ivlDefs []intervalDef, log *zap.Logger, next consumer.Metrics) (*Processor, error) { dbOpts := &pebble.Options{ Merger: &pebble.Merger{ Name: "pmetrics_merger", Merge: func(key, value []byte) (pebble.ValueMerger, error) { v := merger.NewValue( cfg.ResourceLimit, cfg.ScopeLimit, cfg.MetricLimit, cfg.DatapointLimit, cfg.ExponentialHistogramMaxBuckets, ) if err := v.Unmarshal(value); err != nil { return nil, fmt.Errorf("failed to unmarshal value from db: %w", err) } return merger.New(v), nil }, }, MemTableSize: pebbleMemTableSize, MemTableStopWritesThreshold: pebbleMemTableStopWritesThreshold, } writeOpts := pebble.Sync dataDir := cfg.Directory if dataDir == "" { log.Info("no directory specified, switching to in-memory mode") dbOpts.FS = vfs.NewMem() dbOpts.DisableWAL = true writeOpts = pebble.NoSync dataDir = "/data" // will be created in the in-mem file-system } sortedMetadataKeys := append([]string{}, cfg.MetadataKeys...) sort.Strings(sortedMetadataKeys) ctx, cancel := context.WithCancel(context.Background()) return &Processor{ cfg: cfg, sortedMetadataKeys: sortedMetadataKeys, dataDir: dataDir, dbOpts: dbOpts, wOpts: writeOpts, intervals: ivlDefs, next: next, processingTime: time.Now().UTC().Truncate(ivlDefs[0].Duration), ctx: ctx, cancel: cancel, logger: log, }, nil } func (p *Processor) Start(ctx context.Context, host component.Host) error { p.mu.Lock() if p.db == nil { db, err := pebble.Open(p.dataDir, p.dbOpts) if err != nil { return fmt.Errorf("failed to open database: %w", err) } p.db = db } if p.exportStopped == nil { p.exportStopped = make(chan struct{}) } p.mu.Unlock() go func() { defer close(p.exportStopped) to := p.processingTime.Add(p.intervals[0].Duration) timer := time.NewTimer(time.Until(to)) defer timer.Stop() for { select { case <-p.ctx.Done(): return case <-timer.C: } p.mu.Lock() batch := p.batch p.batch = nil p.processingTime = to p.mu.Unlock() // Export the batch if err := p.commitAndExport(p.ctx, batch, to); err != nil { p.logger.Warn("failed to export", zap.Error(err), zap.Time("end_time", to)) } to = to.Add(p.intervals[0].Duration) timer.Reset(time.Until(to)) } }() return nil } func (p *Processor) Shutdown(ctx context.Context) error { defer p.logger.Info("shutdown finished") // Signal stop for the exporting goroutine p.cancel() // Wait for the exporting goroutine to stop. Note that we don't need to acquire // mutex here since even if there is a race between Start and Shutdown the // processor context is cancelled ensuring export goroutine will be noop. if p.exportStopped != nil { select { case <-ctx.Done(): return fmt.Errorf("failed to shutdown due to context timeout while waiting for export to stop: %w", ctx.Err()) case <-p.exportStopped: } } p.mu.Lock() defer p.mu.Unlock() // Ensure all data in the database is exported if p.db != nil { p.logger.Info("exporting all data before shutting down") if p.batch != nil { if err := p.batch.Commit(p.wOpts); err != nil { return fmt.Errorf("failed to commit batch: %w", err) } if err := p.batch.Close(); err != nil { return fmt.Errorf("failed to close batch: %w", err) } p.batch = nil } var errs []error for _, ivl := range p.intervals { // At any particular time there will be 1 export candidate for // each aggregation interval. We will align the end time and // process each of these. to := p.processingTime.Truncate(ivl.Duration).Add(ivl.Duration) if err := p.export(ctx, to); err != nil { errs = append(errs, fmt.Errorf( "failed to export metrics for interval %s: %w", ivl.Duration, err), ) } } if len(errs) > 0 { return fmt.Errorf("failed while running final export: %w", errors.Join(errs...)) } if err := p.db.Close(); err != nil { return fmt.Errorf("failed to close database: %w", err) } // All future operations are invalid after db is closed p.db = nil } return nil } func (p *Processor) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { v := merger.NewValue( p.cfg.ResourceLimit, p.cfg.ScopeLimit, p.cfg.MetricLimit, p.cfg.DatapointLimit, p.cfg.ExponentialHistogramMaxBuckets, ) var errs []error nextMD := pmetric.NewMetrics() rms := md.ResourceMetrics() for i := 0; i < rms.Len(); i++ { var nextMDResourceMetrics pmetric.ResourceMetrics rm := rms.At(i) sms := rm.ScopeMetrics() for i := 0; i < sms.Len(); i++ { var nextMDScopeMetrics pmetric.ScopeMetrics sm := sms.At(i) ms := sm.Metrics() for i := 0; i < ms.Len(); i++ { m := ms.At(i) switch t := m.Type(); t { case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: // TODO (lahsivjar): implement support for gauges // // For now, pass through by copying across to nextMD below. break case pmetric.MetricTypeSummary: if p.cfg.PassThrough.Summary { // Copy across to nextMD below. break } if err := v.MergeMetric(rm, sm, m); err != nil { errs = append(errs, err) } continue case pmetric.MetricTypeSum, pmetric.MetricTypeHistogram, pmetric.MetricTypeExponentialHistogram: if err := v.MergeMetric(rm, sm, m); err != nil { errs = append(errs, err) } continue default: // All metric types are handled, this is unexpected errs = append(errs, fmt.Errorf("unexpected metric type, dropping: %d", t)) continue } if nextMDScopeMetrics == (pmetric.ScopeMetrics{}) { if nextMDResourceMetrics == (pmetric.ResourceMetrics{}) { nextMDResourceMetrics = nextMD.ResourceMetrics().AppendEmpty() rm.Resource().CopyTo(nextMDResourceMetrics.Resource()) nextMDResourceMetrics.SetSchemaUrl(rm.SchemaUrl()) } nextMDScopeMetrics = nextMDResourceMetrics.ScopeMetrics().AppendEmpty() sm.Scope().CopyTo(nextMDScopeMetrics.Scope()) nextMDScopeMetrics.SetSchemaUrl(sm.SchemaUrl()) } m.CopyTo(nextMDScopeMetrics.Metrics().AppendEmpty()) } } } mb, ok := p.bufferPool.Get().(*mergeBuffer) if !ok { mb = &mergeBuffer{} } defer p.bufferPool.Put(mb) var err error mb.value, err = v.AppendBinary(mb.value[:0]) if err != nil { return errors.Join(append(errs, fmt.Errorf("failed to marshal value to proto binary: %w", err))...) } clientInfo := client.FromContext(ctx) clientMetadata := make([]merger.KeyValues, 0, len(p.sortedMetadataKeys)) for _, k := range p.sortedMetadataKeys { if values := clientInfo.Metadata.Get(k); len(values) != 0 { clientMetadata = append(clientMetadata, merger.KeyValues{ Key: k, Values: values, }) } } if err := p.mergeToBatch(mb, clientMetadata); err != nil { return fmt.Errorf("failed to merge the value to batch: %w", err) } // Call next for the metrics remaining in the input if err := p.next.ConsumeMetrics(ctx, nextMD); err != nil { errs = append(errs, err) } if len(errs) > 0 { return errors.Join(errs...) } return nil } func (p *Processor) mergeToBatch(mb *mergeBuffer, clientMetadata []merger.KeyValues) (err error) { p.mu.Lock() defer p.mu.Unlock() if p.batch == nil { p.batch = newBatch(p.db) } for _, ivl := range p.intervals { key := merger.Key{ Interval: ivl.Duration, ProcessingTime: p.processingTime, Metadata: clientMetadata, } var err error mb.key, err = key.AppendBinary(mb.key[:0]) if err != nil { return fmt.Errorf("failed to marshal key to binary for ivl %s: %w", ivl.Duration, err) } if err := p.batch.Merge(mb.key, mb.value, nil); err != nil { return fmt.Errorf("failed to merge to db: %w", err) } } if p.batch.Len() >= dbCommitThresholdBytes { if err := p.batch.Commit(p.wOpts); err != nil { return fmt.Errorf("failed to commit a batch to db: %w", err) } if err := p.batch.Close(); err != nil { return fmt.Errorf("failed to close a batch post commit: %w", err) } p.batch = nil } return nil } type mergeBuffer struct { key []byte value []byte } // commitAndExport commits the batch to DB and exports all aggregated metrics in the provided range // bounded by `to. If the batch is not committed then a corresponding error would be returned however // exports will still proceed. func (p *Processor) commitAndExport(ctx context.Context, batch *pebble.Batch, to time.Time) error { var errs []error if batch != nil { if err := batch.Commit(p.wOpts); err != nil { errs = append(errs, fmt.Errorf("failed to commit batch before export: %w", err)) } if err := batch.Close(); err != nil { errs = append(errs, fmt.Errorf("failed to close batch before export: %w", err)) } } if err := p.export(ctx, to); err != nil { errs = append(errs, fmt.Errorf("failed to export: %w", err)) } if len(errs) > 0 { return errors.Join(errs...) } return nil } func (p *Processor) export(ctx context.Context, end time.Time) error { snap := p.db.NewSnapshot() defer snap.Close() var errs []error for _, ivl := range p.intervals { // Check if the given aggregation interval needs to be exported now if end.Truncate(ivl.Duration).Equal(end) { start := end.Add(-ivl.Duration) exportedCount, err := p.exportForInterval(ctx, snap, start, end, ivl) if err != nil { errs = append(errs, fmt.Errorf("failed to export interval %s for end time %d: %w", ivl.Duration, end.Unix(), err)) } p.logger.Debug( "Finished exporting metrics", zap.Int("exported_datapoints", exportedCount), zap.Duration("interval", ivl.Duration), zap.Time("exported_till(exclusive)", end), zap.Error(err), ) } } return errors.Join(errs...) } func (p *Processor) exportForInterval( ctx context.Context, snap *pebble.Snapshot, start, end time.Time, ivl intervalDef, ) (int, error) { var boundsBuffer []byte from := merger.Key{Interval: ivl.Duration, ProcessingTime: start} boundsBuffer, err := from.AppendBinary(nil) if err != nil { return 0, fmt.Errorf("failed to encode range: %w", err) } lb := boundsBuffer[:] to := merger.Key{Interval: ivl.Duration, ProcessingTime: end} boundsBuffer, err = to.AppendBinary(boundsBuffer) if err != nil { return 0, fmt.Errorf("failed to encode range: %w", err) } ub := boundsBuffer[len(lb):] iter, err := snap.NewIter(&pebble.IterOptions{ LowerBound: lb, UpperBound: ub, KeyTypes: pebble.IterKeyTypePointsOnly, }) if err != nil { return 0, fmt.Errorf("failed to create iterator: %w", err) } defer iter.Close() var errs []error var exportedDPCount int rangeHasData := iter.First() for ; iter.Valid(); iter.Next() { v := merger.NewValue( p.cfg.ResourceLimit, p.cfg.ScopeLimit, p.cfg.MetricLimit, p.cfg.DatapointLimit, p.cfg.ExponentialHistogramMaxBuckets, ) var key merger.Key if err := key.Unmarshal(iter.Key()); err != nil { errs = append(errs, fmt.Errorf("failed to decode key from database: %w", err)) continue } if err := v.Unmarshal(iter.Value()); err != nil { errs = append(errs, fmt.Errorf("failed to decode value from database: %w", err)) continue } finalMetrics, err := v.Finalize() if err != nil { errs = append(errs, fmt.Errorf("failed to finalize merged metric: %w", err)) continue } resourceMetrics := finalMetrics.ResourceMetrics() if ivl.Statements != nil { for i := 0; i < resourceMetrics.Len(); i++ { res := resourceMetrics.At(i) scopeMetrics := res.ScopeMetrics() for j := 0; j < scopeMetrics.Len(); j++ { scope := scopeMetrics.At(j) metrics := scope.Metrics() for k := 0; k < metrics.Len(); k++ { metric := metrics.At(k) executeTransform := func(dp any) { dCtx := ottldatapoint.NewTransformContext(dp, metric, metrics, scope.Scope(), res.Resource(), scope, res) if err := ivl.Statements.Execute(ctx, dCtx); err != nil { errs = append(errs, fmt.Errorf("failed to execute ottl statement for interval %s: %w", ivl.Duration, err)) } } // TODO (lahsivjar): add exhaustive:enforce lint rule //exhaustive:enforce switch metric.Type() { case pmetric.MetricTypeGauge: dps := metric.Gauge().DataPoints() for l := 0; l < dps.Len(); l++ { executeTransform(dps.At(l)) } case pmetric.MetricTypeSum: dps := metric.Sum().DataPoints() for l := 0; l < dps.Len(); l++ { executeTransform(dps.At(l)) } case pmetric.MetricTypeSummary: dps := metric.Summary().DataPoints() for l := 0; l < dps.Len(); l++ { executeTransform(dps.At(l)) } case pmetric.MetricTypeHistogram: dps := metric.Histogram().DataPoints() for l := 0; l < dps.Len(); l++ { executeTransform(dps.At(l)) } case pmetric.MetricTypeExponentialHistogram: dps := metric.ExponentialHistogram().DataPoints() for l := 0; l < dps.Len(); l++ { executeTransform(dps.At(l)) } } } } } } if n := len(key.Metadata); n != 0 { metadataMap := make(map[string][]string, n) for _, kvs := range key.Metadata { metadataMap[kvs.Key] = kvs.Values } info := client.FromContext(ctx) info.Metadata = client.NewMetadata(metadataMap) ctx = client.NewContext(ctx, info) } if err := p.next.ConsumeMetrics(ctx, finalMetrics); err != nil { errs = append(errs, fmt.Errorf("failed to consume the decoded value: %w", err)) continue } exportedDPCount += finalMetrics.DataPointCount() } if rangeHasData { if err := p.db.DeleteRange(lb, ub, p.wOpts); err != nil { errs = append(errs, fmt.Errorf("failed to delete exported entries: %w", err)) } } if len(errs) > 0 { return exportedDPCount, errors.Join(errs...) } return exportedDPCount, nil } func newBatch(db *pebble.DB) *pebble.Batch { // TODO (lahsivjar): Optimize batch as per our needs // Requires release of https://github.com/cockroachdb/pebble/pull/3139 return db.NewBatch() }