bulk

in lib/logstash/outputs/amazon_es/http_client.rb [86:125]


    def bulk(actions)
      @action_count ||= 0
      @action_count += actions.size

      return if actions.empty?

      bulk_actions = actions.collect do |action, args, source|
        args, source = update_action_builder(args, source) if action == 'update'

        if source && action != 'delete'
          next [ { action => args }, source ]
        else
          next { action => args }
        end
      end

      body_stream = StringIO.new
      if http_compression
        body_stream.set_encoding "BINARY"
        stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
      else 
        stream_writer = body_stream
      end
      bulk_responses = []
      bulk_actions.each do |action|
        as_json = action.is_a?(Array) ?
                    action.map {|line| LogStash::Json.dump(line)}.join("\n") :
                    LogStash::Json.dump(action)
        as_json << "\n"
        if (body_stream.size + as_json.bytesize) > @max_bulk_bytes
          bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
        end
        stream_writer.write(as_json)
      end
      stream_writer.close if http_compression
      bulk_responses << bulk_send(body_stream) if body_stream.size > 0
      body_stream.close if !http_compression
      join_bulk_responses(bulk_responses)
    end