in aggregators/aggregator.go [282:347]
func (a *Aggregator) Close(ctx context.Context) error {
ctx, span := a.cfg.Tracer.Start(ctx, "Aggregator.Close")
defer span.End()
a.mu.Lock()
defer a.mu.Unlock()
select {
case <-a.closed:
default:
a.cfg.Logger.Info("stopping aggregator")
close(a.closed)
}
if a.runStopped != nil {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for run to complete: %w", ctx.Err())
case <-a.runStopped:
}
}
if a.db != nil {
a.cfg.Logger.Info("running final aggregation")
if a.batch != nil {
if err := a.batch.Commit(a.writeOptions); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to commit batch: %w", err)
}
if err := a.batch.Close(); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to close batch: %w", err)
}
a.batch = nil
}
var errs []error
for _, ivl := range a.cfg.AggregationIntervals {
// At any particular time there will be 1 harvest candidate for
// each aggregation interval. We will align the end time and
// process each of these.
//
// TODO (lahsivjar): It is possible to harvest the same
// time multiple times, not an issue but can be optimized.
to := a.processingTime.Truncate(ivl).Add(ivl)
if err := a.harvest(ctx, to, a.cachedEvents.loadAndDelete(to)); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf(
"failed to harvest metrics for interval %s: %w", formatDuration(ivl), err),
)
}
}
if len(errs) > 0 {
return fmt.Errorf("failed while running final harvest: %w", errors.Join(errs...))
}
if err := a.db.Close(); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to close pebble: %w", err)
}
// All future operations are invalid after db is closed
a.db = nil
}
if err := a.metrics.CleanUp(); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to cleanup instrumentation: %w", err)
}
return nil
}