func()

in aggregators/aggregator.go [369:402]


func (a *Aggregator) aggregate(
	ctx context.Context,
	cmk CombinedMetricsKey,
	cm *aggregationpb.CombinedMetrics,
) (int, error) {
	if a.batch == nil {
		// Batch is backed by a sync pool. After each commit we will release the batch
		// back to the pool by calling Batch#Close and subsequently acquire a new batch.
		a.batch = a.db.NewBatch()
	}

	op := a.batch.MergeDeferred(cmk.SizeBinary(), cm.SizeVT())
	if err := cmk.MarshalBinaryToSizedBuffer(op.Key); err != nil {
		return 0, fmt.Errorf("failed to marshal combined metrics key: %w", err)
	}
	if _, err := cm.MarshalToSizedBufferVT(op.Value); err != nil {
		return 0, fmt.Errorf("failed to marshal combined metrics: %w", err)
	}
	if err := op.Finish(); err != nil {
		return 0, fmt.Errorf("failed to finalize merge operation: %w", err)
	}

	bytesIn := cm.SizeVT()
	if a.batch.Len() >= dbCommitThresholdBytes {
		if err := a.batch.Commit(a.writeOptions); err != nil {
			return bytesIn, fmt.Errorf("failed to commit pebble batch: %w", err)
		}
		if err := a.batch.Close(); err != nil {
			return bytesIn, fmt.Errorf("failed to close pebble batch: %w", err)
		}
		a.batch = nil
	}
	return bytesIn, nil
}