in aggregators/aggregator.go [369:402]
func (a *Aggregator) aggregate(
ctx context.Context,
cmk CombinedMetricsKey,
cm *aggregationpb.CombinedMetrics,
) (int, error) {
if a.batch == nil {
// Batch is backed by a sync pool. After each commit we will release the batch
// back to the pool by calling Batch#Close and subsequently acquire a new batch.
a.batch = a.db.NewBatch()
}
op := a.batch.MergeDeferred(cmk.SizeBinary(), cm.SizeVT())
if err := cmk.MarshalBinaryToSizedBuffer(op.Key); err != nil {
return 0, fmt.Errorf("failed to marshal combined metrics key: %w", err)
}
if _, err := cm.MarshalToSizedBufferVT(op.Value); err != nil {
return 0, fmt.Errorf("failed to marshal combined metrics: %w", err)
}
if err := op.Finish(); err != nil {
return 0, fmt.Errorf("failed to finalize merge operation: %w", err)
}
bytesIn := cm.SizeVT()
if a.batch.Len() >= dbCommitThresholdBytes {
if err := a.batch.Commit(a.writeOptions); err != nil {
return bytesIn, fmt.Errorf("failed to commit pebble batch: %w", err)
}
if err := a.batch.Close(); err != nil {
return bytesIn, fmt.Errorf("failed to close pebble batch: %w", err)
}
a.batch = nil
}
return bytesIn, nil
}