func()

in aggregators/aggregator.go [282:347]


func (a *Aggregator) Close(ctx context.Context) error {
	ctx, span := a.cfg.Tracer.Start(ctx, "Aggregator.Close")
	defer span.End()

	a.mu.Lock()
	defer a.mu.Unlock()

	select {
	case <-a.closed:
	default:
		a.cfg.Logger.Info("stopping aggregator")
		close(a.closed)
	}
	if a.runStopped != nil {
		select {
		case <-ctx.Done():
			return fmt.Errorf("context cancelled while waiting for run to complete: %w", ctx.Err())
		case <-a.runStopped:
		}
	}

	if a.db != nil {
		a.cfg.Logger.Info("running final aggregation")
		if a.batch != nil {
			if err := a.batch.Commit(a.writeOptions); err != nil {
				span.RecordError(err)
				return fmt.Errorf("failed to commit batch: %w", err)
			}
			if err := a.batch.Close(); err != nil {
				span.RecordError(err)
				return fmt.Errorf("failed to close batch: %w", err)
			}
			a.batch = nil
		}
		var errs []error
		for _, ivl := range a.cfg.AggregationIntervals {
			// At any particular time there will be 1 harvest candidate for
			// each aggregation interval. We will align the end time and
			// process each of these.
			//
			// TODO (lahsivjar): It is possible to harvest the same
			// time multiple times, not an issue but can be optimized.
			to := a.processingTime.Truncate(ivl).Add(ivl)
			if err := a.harvest(ctx, to, a.cachedEvents.loadAndDelete(to)); err != nil {
				span.RecordError(err)
				errs = append(errs, fmt.Errorf(
					"failed to harvest metrics for interval %s: %w", formatDuration(ivl), err),
				)
			}
		}
		if len(errs) > 0 {
			return fmt.Errorf("failed while running final harvest: %w", errors.Join(errs...))
		}
		if err := a.db.Close(); err != nil {
			span.RecordError(err)
			return fmt.Errorf("failed to close pebble: %w", err)
		}
		// All future operations are invalid after db is closed
		a.db = nil
	}
	if err := a.metrics.CleanUp(); err != nil {
		span.RecordError(err)
		return fmt.Errorf("failed to cleanup instrumentation: %w", err)
	}
	return nil
}