func()

in appender.go [227:256]


func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo) error {
	if index == "" {
		return errMissingIndex
	}
	if document == nil {
		return errMissingBody
	}

	// Send the BulkIndexerItem to the internal channel, allowing individual
	// documents to be processed by an active bulk indexer in a dedicated
	// goroutine, improving data locality and minimising lock contention.
	item := BulkIndexerItem{
		Index: index,
		Body:  document,
	}
	if len(a.bulkItems) == cap(a.bulkItems) {
		a.addCount(1, &a.blockedAdd, a.metrics.blockedAdd)
	}

	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-a.closed:
		return ErrClosed
	case a.bulkItems <- item:
	}
	a.addCount(1, &a.docsAdded, a.metrics.docsAdded)
	a.addUpDownCount(1, &a.docsActive, a.metrics.docsActive)
	return nil
}