in bulk_indexer.go [292:320]
func (b *BulkIndexer) Add(item BulkIndexerItem) error {
action := item.Action
if action == "" {
action = ActionCreate
}
switch action {
case ActionCreate, ActionDelete, ActionIndex, ActionUpdate:
default:
return fmt.Errorf("%s is not a valid action", action)
}
b.writeMeta(
item.Index,
item.DocumentID,
item.Pipeline,
action,
item.DynamicTemplates,
item.RequireDataStream,
)
if _, err := item.Body.WriteTo(b.writer); err != nil {
return fmt.Errorf("failed to write bulk indexer item: %w", err)
}
if _, err := b.writer.Write([]byte("\n")); err != nil {
return fmt.Errorf("failed to write newline: %w", err)
}
b.itemsAdded++
return nil
}