in opensearch-transport/lib/opensearch/transport/transport/base.rb [269:373]
def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block)
raise NoMethodError, 'Implement this method in your transport class' unless block_given?
start = Time.now
tries = 0
reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])
max_retries = if opts.key?(:retry_on_failure)
opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
elsif options.key?(:retry_on_failure)
options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure]
end
params = params.clone
ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }
begin
tries += 1
connection = get_connection or raise Error.new('Cannot get new connection from pool.')
if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
params = connection.connection.params.merge(params.to_hash)
end
url = connection.full_url(path, params)
response = block.call(connection, url)
connection.healthy! if connection.failures > 0
__raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
rescue OpenSearch::Transport::Transport::ServerError => e
if response && @retry_on_status.include?(response.status)
log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
if tries <= (max_retries || DEFAULT_MAX_RETRIES)
retry
else
log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
raise e
end
else
raise e
end
rescue *host_unreachable_exceptions => e
log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"
connection.dead!
if reload_on_failure and tries < connections.all.size
log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
reload_connections! and retry
end
if max_retries
log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
if tries <= max_retries
retry
else
log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
raise e
end
else
raise e
end
rescue Exception => e
log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
raise e
end
duration = Time.now - start
if response.status.to_i >= 300
__log_response method, path, params, body, url, response, nil, 'N/A', duration
__trace method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer
unless ignore.include?(response.status.to_i)
log_fatal "[#{response.status}] #{response.body}"
end
__raise_transport_error response unless ignore.include?(response.status.to_i)
end
json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a'
unless ignore.include?(response.status.to_i)
__log_response method, path, params, body, url, response, json, took, duration
end
__trace method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer
warnings(response.headers['warning']) if response.headers&.[]('warning')
Response.new response.status, json || response.body, response.headers
ensure
@last_request_at = Time.now
end