in processor/lsmintervalprocessor/processor.go [196:250]
func (p *Processor) Shutdown(ctx context.Context) error {
defer p.logger.Info("shutdown finished")
// Signal stop for the exporting goroutine
p.cancel()
// Wait for the exporting goroutine to stop. Note that we don't need to acquire
// mutex here since even if there is a race between Start and Shutdown the
// processor context is cancelled ensuring export goroutine will be noop.
if p.exportStopped != nil {
select {
case <-ctx.Done():
return fmt.Errorf("failed to shutdown due to context timeout while waiting for export to stop: %w", ctx.Err())
case <-p.exportStopped:
}
}
p.mu.Lock()
defer p.mu.Unlock()
// Ensure all data in the database is exported
if p.db != nil {
p.logger.Info("exporting all data before shutting down")
if p.batch != nil {
if err := p.batch.Commit(p.wOpts); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
if err := p.batch.Close(); err != nil {
return fmt.Errorf("failed to close batch: %w", err)
}
p.batch = nil
}
var errs []error
for _, ivl := range p.intervals {
// At any particular time there will be 1 export candidate for
// each aggregation interval. We will align the end time and
// process each of these.
to := p.processingTime.Truncate(ivl.Duration).Add(ivl.Duration)
if err := p.export(ctx, to); err != nil {
errs = append(errs, fmt.Errorf(
"failed to export metrics for interval %s: %w", ivl.Duration, err),
)
}
}
if len(errs) > 0 {
return fmt.Errorf("failed while running final export: %w", errors.Join(errs...))
}
if err := p.db.Close(); err != nil {
return fmt.Errorf("failed to close database: %w", err)
}
// All future operations are invalid after db is closed
p.db = nil
}
return nil
}