in appender.go [166:198]
func (a *Appender) Close(ctx context.Context) error {
a.mu.Lock()
defer a.mu.Unlock()
select {
case <-a.closed:
return a.errgroup.Wait()
default:
}
close(a.closed)
// Cancel ongoing flushes/pool.Get() when ctx is cancelled.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
defer a.cancelErrgroupContext(errors.New("cancelled by appender.close"))
<-ctx.Done()
}()
if err := a.errgroup.Wait(); err != nil {
return err
}
indexers := a.pool.Deregister(a.id)
var errs []error
for bi := range indexers {
if err := a.flush(context.Background(), bi); err != nil {
errs = append(errs, fmt.Errorf("indexer failed: %w", err))
}
}
if len(errs) != 0 {
return fmt.Errorf("failed to flush events on close: %w", errors.Join(errs...))
}
return nil
}