in appender.go [227:256]
func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo) error {
if index == "" {
return errMissingIndex
}
if document == nil {
return errMissingBody
}
// Send the BulkIndexerItem to the internal channel, allowing individual
// documents to be processed by an active bulk indexer in a dedicated
// goroutine, improving data locality and minimising lock contention.
item := BulkIndexerItem{
Index: index,
Body: document,
}
if len(a.bulkItems) == cap(a.bulkItems) {
a.addCount(1, &a.blockedAdd, a.metrics.blockedAdd)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-a.closed:
return ErrClosed
case a.bulkItems <- item:
}
a.addCount(1, &a.docsAdded, a.metrics.docsAdded)
a.addUpDownCount(1, &a.docsActive, a.metrics.docsActive)
return nil
}