in elasticsearch-model/lib/elasticsearch/model/importing.rb [135:176]
def import(options={}, &block)
errors = []
refresh = options.delete(:refresh) || false
target_index = options.delete(:index) || index_name
transform = options.delete(:transform) || __transform
pipeline = options.delete(:pipeline)
return_value = options.delete(:return) || 'count'
unless transform.respond_to?(:call)
raise ArgumentError,
"Pass an object responding to `call` as the :transform option, #{transform.class} given"
end
if options.delete(:force)
self.create_index! force: true, index: target_index
elsif !self.index_exists? index: target_index
raise ArgumentError,
"#{target_index} does not exist to be imported into. Use create_index! or the :force option to create it."
end
__find_in_batches(options) do |batch|
params = {
index: target_index,
body: __batch_to_bulk(batch, transform)
}
params[:pipeline] = pipeline if pipeline
response = client.bulk params
yield response if block_given?
errors += response['items'].select { |k, v| k.values.first['error'] }
end
self.refresh_index! index: target_index if refresh
case return_value
when 'errors'
errors
else
errors.size
end
end