lib/anthropic/internal/transport/base_client.rb (253 lines of code) (raw):
# frozen_string_literal: true
module Anthropic
module Internal
module Transport
# @api private
#
# @abstract
class BaseClient
# from whatwg fetch spec
MAX_REDIRECTS = 20
# rubocop:disable Style/MutableConstant
PLATFORM_HEADERS =
{
"x-stainless-arch" => Anthropic::Internal::Util.arch,
"x-stainless-lang" => "ruby",
"x-stainless-os" => Anthropic::Internal::Util.os,
"x-stainless-package-version" => Anthropic::VERSION,
"x-stainless-runtime" => ::RUBY_ENGINE,
"x-stainless-runtime-version" => ::RUBY_ENGINE_VERSION
}
# rubocop:enable Style/MutableConstant
class << self
# @api private
#
# @param req [Hash{Symbol=>Object}]
#
# @raise [ArgumentError]
def validate!(req)
keys = [:method, :path, :query, :headers, :body, :unwrap, :page, :stream, :model, :options]
case req
in Hash
req.each_key do |k|
unless keys.include?(k)
raise ArgumentError.new("Request `req` keys must be one of #{keys}, got #{k.inspect}")
end
end
else
raise ArgumentError.new("Request `req` must be a Hash or RequestOptions, got #{req.inspect}")
end
end
# @api private
#
# @param status [Integer]
# @param headers [Hash{String=>String}, Net::HTTPHeader]
#
# @return [Boolean]
def should_retry?(status, headers:)
coerced = Anthropic::Internal::Util.coerce_boolean(headers["x-should-retry"])
case [coerced, status]
in [true | false, _]
coerced
in [_, 408 | 409 | 429 | (500..)]
# retry on:
# 408: timeouts
# 409: locks
# 429: rate limits
# 500+: unknown errors
true
else
false
end
end
# @api private
#
# @param request [Hash{Symbol=>Object}] .
#
# @option request [Symbol] :method
#
# @option request [URI::Generic] :url
#
# @option request [Hash{String=>String}] :headers
#
# @option request [Object] :body
#
# @option request [Integer] :max_retries
#
# @option request [Float] :timeout
#
# @param status [Integer]
#
# @param response_headers [Hash{String=>String}, Net::HTTPHeader]
#
# @return [Hash{Symbol=>Object}]
def follow_redirect(request, status:, response_headers:)
method, url, headers = request.fetch_values(:method, :url, :headers)
location =
Kernel.then do
URI.join(url, response_headers["location"])
rescue ArgumentError
message = "Server responded with status #{status} but no valid location header."
raise Anthropic::Errors::APIConnectionError.new(url: url, message: message)
end
request = {**request, url: location}
case [url.scheme, location.scheme]
in ["https", "http"]
message = "Tried to redirect to a insecure URL"
raise Anthropic::Errors::APIConnectionError.new(url: url, message: message)
else
nil
end
# from whatwg fetch spec
case [status, method]
in [301 | 302, :post] | [303, _]
drop = %w[content-encoding content-language content-length content-location content-type]
request = {
**request,
method: method == :head ? :head : :get,
headers: headers.except(*drop),
body: nil
}
else
end
# from undici
if Anthropic::Internal::Util.uri_origin(url) != Anthropic::Internal::Util.uri_origin(location)
drop = %w[authorization cookie host proxy-authorization]
request = {**request, headers: request.fetch(:headers).except(*drop)}
end
request
end
# @api private
#
# @param status [Integer, Anthropic::Errors::APIConnectionError]
# @param stream [Enumerable<String>, nil]
def reap_connection!(status, stream:)
case status
in (..199) | (300..499)
stream&.each { next }
in Anthropic::Errors::APIConnectionError | (500..)
Anthropic::Internal::Util.close_fused!(stream)
else
end
end
end
# @api private
# @return [Anthropic::Internal::Transport::PooledNetRequester]
attr_accessor :requester
# @api private
#
# @param base_url [String]
# @param timeout [Float]
# @param max_retries [Integer]
# @param initial_retry_delay [Float]
# @param max_retry_delay [Float]
# @param headers [Hash{String=>String, Integer, Array<String, Integer, nil>, nil}]
# @param idempotency_header [String, nil]
def initialize(
base_url:,
timeout: 0.0,
max_retries: 0,
initial_retry_delay: 0.0,
max_retry_delay: 0.0,
headers: {},
idempotency_header: nil
)
@requester = Anthropic::Internal::Transport::PooledNetRequester.new
@headers = Anthropic::Internal::Util.normalized_headers(
self.class::PLATFORM_HEADERS,
{
"accept" => "application/json",
"content-type" => "application/json"
},
headers
)
@base_url = Anthropic::Internal::Util.parse_uri(base_url)
@idempotency_header = idempotency_header&.to_s&.downcase
@max_retries = max_retries
@timeout = timeout
@initial_retry_delay = initial_retry_delay
@max_retry_delay = max_retry_delay
end
# @api private
#
# @return [Hash{String=>String}]
private def auth_headers = {}
# @api private
#
# @return [String]
private def generate_idempotency_key = "stainless-ruby-retry-#{SecureRandom.uuid}"
# @api private
#
# @param req [Hash{Symbol=>Object}] .
#
# @option req [Symbol] :method
#
# @option req [String, Array<String>] :path
#
# @option req [Hash{String=>Array<String>, String, nil}, nil] :query
#
# @option req [Hash{String=>String, Integer, Array<String, Integer, nil>, nil}, nil] :headers
#
# @option req [Object, nil] :body
#
# @option req [Symbol, nil] :unwrap
#
# @option req [Class, nil] :page
#
# @option req [Class, nil] :stream
#
# @option req [Anthropic::Internal::Type::Converter, Class, nil] :model
#
# @param opts [Hash{Symbol=>Object}] .
#
# @option opts [String, nil] :idempotency_key
#
# @option opts [Hash{String=>Array<String>, String, nil}, nil] :extra_query
#
# @option opts [Hash{String=>String, nil}, nil] :extra_headers
#
# @option opts [Object, nil] :extra_body
#
# @option opts [Integer, nil] :max_retries
#
# @option opts [Float, nil] :timeout
#
# @return [Hash{Symbol=>Object}]
private def build_request(req, opts)
method, uninterpolated_path = req.fetch_values(:method, :path)
path = Anthropic::Internal::Util.interpolate_path(uninterpolated_path)
query = Anthropic::Internal::Util.deep_merge(req[:query].to_h, opts[:extra_query].to_h)
headers = Anthropic::Internal::Util.normalized_headers(
@headers,
auth_headers,
req[:headers].to_h,
opts[:extra_headers].to_h
)
if @idempotency_header &&
!headers.key?(@idempotency_header) &&
!Net::HTTP::IDEMPOTENT_METHODS_.include?(method.to_s.upcase)
headers[@idempotency_header] = opts.fetch(:idempotency_key) { generate_idempotency_key }
end
unless headers.key?("x-stainless-retry-count")
headers["x-stainless-retry-count"] = "0"
end
timeout = opts.fetch(:timeout, @timeout).to_f.clamp((0..))
unless headers.key?("x-stainless-timeout") || timeout.zero?
headers["x-stainless-timeout"] = timeout.to_s
end
headers.reject! { |_, v| v.to_s.empty? }
body =
case method
in :get | :head | :options | :trace
nil
else
Anthropic::Internal::Util.deep_merge(*[req[:body], opts[:extra_body]].compact)
end
headers, encoded = Anthropic::Internal::Util.encode_content(headers, body)
{
method: method,
url: Anthropic::Internal::Util.join_parsed_uri(@base_url, {**req, path: path, query: query}),
headers: headers,
body: encoded,
max_retries: opts.fetch(:max_retries, @max_retries),
timeout: timeout
}
end
# @api private
#
# @param headers [Hash{String=>String}]
# @param retry_count [Integer]
#
# @return [Float]
private def retry_delay(headers, retry_count:)
# Non-standard extension
span = Float(headers["retry-after-ms"], exception: false)&.then { _1 / 1000 }
return span if span
retry_header = headers["retry-after"]
return span if (span = Float(retry_header, exception: false))
span = retry_header&.then do
Time.httpdate(_1) - Time.now
rescue ArgumentError
nil
end
return span if span
scale = retry_count**2
jitter = 1 - (0.25 * rand)
(@initial_retry_delay * scale * jitter).clamp(0, @max_retry_delay)
end
# @api private
#
# @param request [Hash{Symbol=>Object}] .
#
# @option request [Symbol] :method
#
# @option request [URI::Generic] :url
#
# @option request [Hash{String=>String}] :headers
#
# @option request [Object] :body
#
# @option request [Integer] :max_retries
#
# @option request [Float] :timeout
#
# @param redirect_count [Integer]
#
# @param retry_count [Integer]
#
# @param send_retry_header [Boolean]
#
# @raise [Anthropic::Errors::APIError]
# @return [Array(Integer, Net::HTTPResponse, Enumerable<String>)]
private def send_request(request, redirect_count:, retry_count:, send_retry_header:)
url, headers, max_retries, timeout = request.fetch_values(:url, :headers, :max_retries, :timeout)
input = {**request.except(:timeout), deadline: Anthropic::Internal::Util.monotonic_secs + timeout}
if send_retry_header
headers["x-stainless-retry-count"] = retry_count.to_s
end
begin
status, response, stream = @requester.execute(input)
rescue Anthropic::Errors::APIConnectionError => e
status = e
end
case status
in ..299
[status, response, stream]
in 300..399 if redirect_count >= self.class::MAX_REDIRECTS
self.class.reap_connection!(status, stream: stream)
message = "Failed to complete the request within #{self.class::MAX_REDIRECTS} redirects."
raise Anthropic::Errors::APIConnectionError.new(url: url, message: message)
in 300..399
self.class.reap_connection!(status, stream: stream)
request = self.class.follow_redirect(request, status: status, response_headers: response)
send_request(
request,
redirect_count: redirect_count + 1,
retry_count: retry_count,
send_retry_header: send_retry_header
)
in Anthropic::Errors::APIConnectionError if retry_count >= max_retries
raise status
in (400..) if retry_count >= max_retries || !self.class.should_retry?(status, headers: response)
decoded = Kernel.then do
Anthropic::Internal::Util.decode_content(response, stream: stream, suppress_error: true)
ensure
self.class.reap_connection!(status, stream: stream)
end
raise Anthropic::Errors::APIStatusError.for(
url: url,
status: status,
body: decoded,
request: nil,
response: response
)
in (400..) | Anthropic::Errors::APIConnectionError
self.class.reap_connection!(status, stream: stream)
delay = retry_delay(response || {}, retry_count: retry_count)
sleep(delay)
send_request(
request,
redirect_count: redirect_count,
retry_count: retry_count + 1,
send_retry_header: send_retry_header
)
end
end
# Execute the request specified by `req`. This is the method that all resource
# methods call into.
#
# @overload request(method, path, query: {}, headers: {}, body: nil, unwrap: nil, page: nil, stream: nil, model: Anthropic::Internal::Type::Unknown, options: {})
#
# @param method [Symbol]
#
# @param path [String, Array<String>]
#
# @param query [Hash{String=>Array<String>, String, nil}, nil]
#
# @param headers [Hash{String=>String, Integer, Array<String, Integer, nil>, nil}, nil]
#
# @param body [Object, nil]
#
# @param unwrap [Symbol, nil]
#
# @param page [Class, nil]
#
# @param stream [Class, nil]
#
# @param model [Anthropic::Internal::Type::Converter, Class, nil]
#
# @param options [Anthropic::RequestOptions, Hash{Symbol=>Object}, nil] .
#
# @option options [String, nil] :idempotency_key
#
# @option options [Hash{String=>Array<String>, String, nil}, nil] :extra_query
#
# @option options [Hash{String=>String, nil}, nil] :extra_headers
#
# @option options [Object, nil] :extra_body
#
# @option options [Integer, nil] :max_retries
#
# @option options [Float, nil] :timeout
#
# @raise [Anthropic::Errors::APIError]
# @return [Object]
def request(req)
self.class.validate!(req)
model = req.fetch(:model) { Anthropic::Internal::Type::Unknown }
opts = req[:options].to_h
Anthropic::RequestOptions.validate!(opts)
request = build_request(req.except(:options), opts)
url = request.fetch(:url)
# Don't send the current retry count in the headers if the caller modified the header defaults.
send_retry_header = request.fetch(:headers)["x-stainless-retry-count"] == "0"
status, response, stream = send_request(
request,
redirect_count: 0,
retry_count: 0,
send_retry_header: send_retry_header
)
decoded = Anthropic::Internal::Util.decode_content(response, stream: stream)
case req
in { stream: Class => st }
st.new(model: model, url: url, status: status, response: response, stream: decoded)
in { page: Class => page }
page.new(client: self, req: req, headers: response, page_data: decoded)
else
unwrapped = Anthropic::Internal::Util.dig(decoded, req[:unwrap])
Anthropic::Internal::Type::Converter.coerce(model, unwrapped)
end
end
# @return [String]
def inspect
# rubocop:disable Layout/LineLength
base_url = Anthropic::Internal::Util.unparse_uri(@base_url)
"#<#{self.class.name}:0x#{object_id.to_s(16)} base_url=#{base_url} max_retries=#{@max_retries} timeout=#{@timeout}>"
# rubocop:enable Layout/LineLength
end
end
end
end
end