lib/connectors/base/custom_client.rb (88 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
require 'faraday'
require 'httpclient'
require 'active_support/core_ext/array/wrap'
require 'active_support/core_ext/numeric/time'
require 'active_support/core_ext/object/deep_dup'
require 'utility'
require 'date'
module Connectors
module Base
class CustomClient
attr_reader :base_url, :middleware, :ensure_fresh_auth
MAX_RETRIES = 5
def initialize(base_url: nil, ensure_fresh_auth: nil)
@base_url = base_url
@ensure_fresh_auth = ensure_fresh_auth
middleware!
end
def middleware!
@middleware = Array.wrap(additional_middleware)
@middleware += Array.wrap(default_middleware)
@middleware.compact!
end
def additional_middleware
[] # define as needed in subclass
end
def default_middleware
[[Faraday::Request::Retry, retry_config]]
end
def retry_config
{
:retry_statuses => [429],
:backoff_factor => 2,
:max => MAX_RETRIES,
:interval => 0.05
}
end
[
:delete,
:get,
:head,
:options,
:patch,
:post,
:put,
].each do |http_verb|
define_method http_verb do |*args, &block|
ensure_fresh_auth.call(self) if ensure_fresh_auth.present?
http_client.public_send(http_verb, *args, &block)
end
end
def http_client!
@http_client = nil
http_client
end
def http_client
@http_client ||= Faraday.new(base_url) do |faraday|
middleware.each do |middleware_config|
faraday.use(*middleware_config)
end
faraday.adapter :httpclient
end
end
private
# https://github.com/lostisland/faraday/blob/b09c6db31591dd1a58fffcc0979b0c7d96b5388b/lib/faraday/connection.rb#L171
METHODS_WITH_BODY = [:post, :put, :patch].freeze
def send_body?(method)
METHODS_WITH_BODY.include?(method.to_sym)
end
def request_with_throttling(method, url, options = {})
response =
if send_body?(method)
public_send(method, url, options[:body], options[:headers])
else
public_send(method, url, options[:params], options[:headers])
end
if response.status == 429
retry_after = response.headers['Retry-After']
multiplier = options.fetch(:retry_mulitplier, 1)
retry_after_secs = (retry_after.is_a?(Array) ? retry_after.first.to_i : retry_after.to_i) * multiplier
retry_after_secs = 60 if retry_after_secs <= 0
Utility::Logger.warn("Exceeded #{self.class} request limits. Going to sleep for #{retry_after_secs} seconds")
raise Utility::ThrottlingError.new(:suspend_until => DateTime.now + retry_after_secs.seconds, :cursors => options[:cursors])
else
response
end
end
end
end
end