in lib/logstash/outputs/opensearch/http_client.rb [92:146]
def bulk(actions)
@action_count ||= 0
@action_count += actions.size
return if actions.empty?
bulk_actions = actions.collect do |action, args, source|
args, source = update_action_builder(args, source) if action == 'update'
if source && action != 'delete'
next [ { action => args }, source ]
else
next { action => args }
end
end
body_stream = StringIO.new
if http_compression
body_stream.set_encoding "BINARY"
stream_writer = gzip_writer(body_stream)
else
stream_writer = body_stream
end
bulk_responses = []
batch_actions = []
bulk_actions.each_with_index do |action, index|
as_json = action.is_a?(Array) ?
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
LogStash::Json.dump(action)
as_json << "\n"
if (stream_writer.pos + as_json.bytesize) > @target_bulk_bytes && stream_writer.pos > 0
stream_writer.flush
logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
:action_count => batch_actions.size,
:payload_size => stream_writer.pos,
:content_length => body_stream.size,
:batch_offset => (index + 1 - batch_actions.size))
bulk_responses << bulk_send(body_stream, batch_actions)
body_stream.truncate(0) && body_stream.seek(0)
stream_writer = gzip_writer(body_stream) if http_compression
batch_actions.clear
end
stream_writer.write(as_json)
batch_actions << action
end
stream_writer.close if http_compression
logger.debug("Sending final bulk request for batch.",
:action_count => batch_actions.size,
:payload_size => stream_writer.pos,
:content_length => body_stream.size,
:batch_offset => (actions.size - batch_actions.size))
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
body_stream.close if !http_compression
join_bulk_responses(bulk_responses)
end