import

in elasticsearch-model/lib/elasticsearch/model/importing.rb [135:176]


        def import(options={}, &block)
          errors       = []
          refresh      = options.delete(:refresh)   || false
          target_index = options.delete(:index)     || index_name
          transform    = options.delete(:transform) || __transform
          pipeline     = options.delete(:pipeline)
          return_value = options.delete(:return)    || 'count'

          unless transform.respond_to?(:call)
            raise ArgumentError,
                  "Pass an object responding to `call` as the :transform option, #{transform.class} given"
          end

          if options.delete(:force)
            self.create_index! force: true, index: target_index
          elsif !self.index_exists? index: target_index
            raise ArgumentError,
                  "#{target_index} does not exist to be imported into. Use create_index! or the :force option to create it."
          end

          __find_in_batches(options) do |batch|
            params = {
              index: target_index,
              body:  __batch_to_bulk(batch, transform)
            }
            params[:pipeline] = pipeline if pipeline
            response = client.bulk params
            yield response if block_given?

            errors +=  response['items'].select { |k, v| k.values.first['error'] }
          end

          self.refresh_index! index: target_index if refresh

          case return_value
            when 'errors'
              errors
            else
              errors.size
          end
        end