bulk

in lib/logstash/outputs/opensearch/http_client.rb [92:146]


    def bulk(actions)
      @action_count ||= 0
      @action_count += actions.size
      return if actions.empty?

      bulk_actions = actions.collect do |action, args, source|
        args, source = update_action_builder(args, source) if action == 'update'

        if source && action != 'delete'
          next [ { action => args }, source ]
        else
          next { action => args }
        end
      end

      body_stream = StringIO.new
      if http_compression
        body_stream.set_encoding "BINARY"
        stream_writer = gzip_writer(body_stream)
      else
        stream_writer = body_stream
      end
      bulk_responses = []
      batch_actions = []
      bulk_actions.each_with_index do |action, index|
        as_json = action.is_a?(Array) ?
                    action.map {|line| LogStash::Json.dump(line)}.join("\n") :
                    LogStash::Json.dump(action)
        as_json << "\n"
        if (stream_writer.pos + as_json.bytesize) > @target_bulk_bytes && stream_writer.pos > 0
          stream_writer.flush 
          logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
                       :action_count => batch_actions.size,
                       :payload_size => stream_writer.pos,
                       :content_length => body_stream.size,
                       :batch_offset => (index + 1 - batch_actions.size))
          bulk_responses << bulk_send(body_stream, batch_actions)
          body_stream.truncate(0) && body_stream.seek(0)
          stream_writer = gzip_writer(body_stream) if http_compression
          batch_actions.clear
        end
        stream_writer.write(as_json)
        batch_actions << action
      end
      stream_writer.close if http_compression
      logger.debug("Sending final bulk request for batch.",
                   :action_count => batch_actions.size,
                   :payload_size => stream_writer.pos,
                   :content_length => body_stream.size,
                   :batch_offset => (actions.size - batch_actions.size))
      bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
      body_stream.close if !http_compression
      join_bulk_responses(bulk_responses)
    end