func()

in appender.go [166:198]


func (a *Appender) Close(ctx context.Context) error {
	a.mu.Lock()
	defer a.mu.Unlock()
	select {
	case <-a.closed:
		return a.errgroup.Wait()
	default:
	}
	close(a.closed)

	// Cancel ongoing flushes/pool.Get() when ctx is cancelled.
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	go func() {
		defer a.cancelErrgroupContext(errors.New("cancelled by appender.close"))
		<-ctx.Done()
	}()

	if err := a.errgroup.Wait(); err != nil {
		return err
	}
	indexers := a.pool.Deregister(a.id)
	var errs []error
	for bi := range indexers {
		if err := a.flush(context.Background(), bi); err != nil {
			errs = append(errs, fmt.Errorf("indexer failed: %w", err))
		}
	}
	if len(errs) != 0 {
		return fmt.Errorf("failed to flush events on close: %w", errors.Join(errs...))
	}
	return nil
}