# frozen_string_literal: true

module Anthropic
  module Internal
    module Transport
      # @api private
      #
      # @abstract
      class BaseClient
        # from whatwg fetch spec
        MAX_REDIRECTS = 20

        # rubocop:disable Style/MutableConstant
        PLATFORM_HEADERS =
          {
            "x-stainless-arch" => Anthropic::Internal::Util.arch,
            "x-stainless-lang" => "ruby",
            "x-stainless-os" => Anthropic::Internal::Util.os,
            "x-stainless-package-version" => Anthropic::VERSION,
            "x-stainless-runtime" => ::RUBY_ENGINE,
            "x-stainless-runtime-version" => ::RUBY_ENGINE_VERSION
          }
        # rubocop:enable Style/MutableConstant

        class << self
          # @api private
          #
          # @param req [Hash{Symbol=>Object}]
          #
          # @raise [ArgumentError]
          def validate!(req)
            keys = [:method, :path, :query, :headers, :body, :unwrap, :page, :stream, :model, :options]
            case req
            in Hash
              req.each_key do |k|
                unless keys.include?(k)
                  raise ArgumentError.new("Request `req` keys must be one of #{keys}, got #{k.inspect}")
                end
              end
            else
              raise ArgumentError.new("Request `req` must be a Hash or RequestOptions, got #{req.inspect}")
            end
          end

          # @api private
          #
          # @param status [Integer]
          # @param headers [Hash{String=>String}, Net::HTTPHeader]
          #
          # @return [Boolean]
          def should_retry?(status, headers:)
            coerced = Anthropic::Internal::Util.coerce_boolean(headers["x-should-retry"])
            case [coerced, status]
            in [true | false, _]
              coerced
            in [_, 408 | 409 | 429 | (500..)]
              # retry on:
              # 408: timeouts
              # 409: locks
              # 429: rate limits
              # 500+: unknown errors
              true
            else
              false
            end
          end

          # @api private
          #
          # @param request [Hash{Symbol=>Object}] .
          #
          #   @option request [Symbol] :method
          #
          #   @option request [URI::Generic] :url
          #
          #   @option request [Hash{String=>String}] :headers
          #
          #   @option request [Object] :body
          #
          #   @option request [Integer] :max_retries
          #
          #   @option request [Float] :timeout
          #
          # @param status [Integer]
          #
          # @param response_headers [Hash{String=>String}, Net::HTTPHeader]
          #
          # @return [Hash{Symbol=>Object}]
          def follow_redirect(request, status:, response_headers:)
            method, url, headers = request.fetch_values(:method, :url, :headers)
            location =
              Kernel.then do
                URI.join(url, response_headers["location"])
              rescue ArgumentError
                message = "Server responded with status #{status} but no valid location header."
                raise Anthropic::Errors::APIConnectionError.new(url: url, message: message)
              end

            request = {**request, url: location}

            case [url.scheme, location.scheme]
            in ["https", "http"]
              message = "Tried to redirect to a insecure URL"
              raise Anthropic::Errors::APIConnectionError.new(url: url, message: message)
            else
              nil
            end

            # from whatwg fetch spec
            case [status, method]
            in [301 | 302, :post] | [303, _]
              drop = %w[content-encoding content-language content-length content-location content-type]
              request = {
                **request,
                method: method == :head ? :head : :get,
                headers: headers.except(*drop),
                body: nil
              }
            else
            end

            # from undici
            if Anthropic::Internal::Util.uri_origin(url) != Anthropic::Internal::Util.uri_origin(location)
              drop = %w[authorization cookie host proxy-authorization]
              request = {**request, headers: request.fetch(:headers).except(*drop)}
            end

            request
          end

          # @api private
          #
          # @param status [Integer, Anthropic::Errors::APIConnectionError]
          # @param stream [Enumerable<String>, nil]
          def reap_connection!(status, stream:)
            case status
            in (..199) | (300..499)
              stream&.each { next }
            in Anthropic::Errors::APIConnectionError | (500..)
              Anthropic::Internal::Util.close_fused!(stream)
            else
            end
          end
        end

        # @api private
        # @return [Anthropic::Internal::Transport::PooledNetRequester]
        attr_accessor :requester

        # @api private
        #
        # @param base_url [String]
        # @param timeout [Float]
        # @param max_retries [Integer]
        # @param initial_retry_delay [Float]
        # @param max_retry_delay [Float]
        # @param headers [Hash{String=>String, Integer, Array<String, Integer, nil>, nil}]
        # @param idempotency_header [String, nil]
        def initialize(
          base_url:,
          timeout: 0.0,
          max_retries: 0,
          initial_retry_delay: 0.0,
          max_retry_delay: 0.0,
          headers: {},
          idempotency_header: nil
        )
          @requester = Anthropic::Internal::Transport::PooledNetRequester.new
          @headers = Anthropic::Internal::Util.normalized_headers(
            self.class::PLATFORM_HEADERS,
            {
              "accept" => "application/json",
              "content-type" => "application/json"
            },
            headers
          )
          @base_url = Anthropic::Internal::Util.parse_uri(base_url)
          @idempotency_header = idempotency_header&.to_s&.downcase
          @max_retries = max_retries
          @timeout = timeout
          @initial_retry_delay = initial_retry_delay
          @max_retry_delay = max_retry_delay
        end

        # @api private
        #
        # @return [Hash{String=>String}]
        private def auth_headers = {}

        # @api private
        #
        # @return [String]
        private def generate_idempotency_key = "stainless-ruby-retry-#{SecureRandom.uuid}"

        # @api private
        #
        # @param req [Hash{Symbol=>Object}] .
        #
        #   @option req [Symbol] :method
        #
        #   @option req [String, Array<String>] :path
        #
        #   @option req [Hash{String=>Array<String>, String, nil}, nil] :query
        #
        #   @option req [Hash{String=>String, Integer, Array<String, Integer, nil>, nil}, nil] :headers
        #
        #   @option req [Object, nil] :body
        #
        #   @option req [Symbol, nil] :unwrap
        #
        #   @option req [Class, nil] :page
        #
        #   @option req [Class, nil] :stream
        #
        #   @option req [Anthropic::Internal::Type::Converter, Class, nil] :model
        #
        # @param opts [Hash{Symbol=>Object}] .
        #
        #   @option opts [String, nil] :idempotency_key
        #
        #   @option opts [Hash{String=>Array<String>, String, nil}, nil] :extra_query
        #
        #   @option opts [Hash{String=>String, nil}, nil] :extra_headers
        #
        #   @option opts [Object, nil] :extra_body
        #
        #   @option opts [Integer, nil] :max_retries
        #
        #   @option opts [Float, nil] :timeout
        #
        # @return [Hash{Symbol=>Object}]
        private def build_request(req, opts)
          method, uninterpolated_path = req.fetch_values(:method, :path)

          path = Anthropic::Internal::Util.interpolate_path(uninterpolated_path)

          query = Anthropic::Internal::Util.deep_merge(req[:query].to_h, opts[:extra_query].to_h)

          headers = Anthropic::Internal::Util.normalized_headers(
            @headers,
            auth_headers,
            req[:headers].to_h,
            opts[:extra_headers].to_h
          )

          if @idempotency_header &&
             !headers.key?(@idempotency_header) &&
             !Net::HTTP::IDEMPOTENT_METHODS_.include?(method.to_s.upcase)
            headers[@idempotency_header] = opts.fetch(:idempotency_key) { generate_idempotency_key }
          end

          unless headers.key?("x-stainless-retry-count")
            headers["x-stainless-retry-count"] = "0"
          end

          timeout = opts.fetch(:timeout, @timeout).to_f.clamp((0..))
          unless headers.key?("x-stainless-timeout") || timeout.zero?
            headers["x-stainless-timeout"] = timeout.to_s
          end

          headers.reject! { |_, v| v.to_s.empty? }

          body =
            case method
            in :get | :head | :options | :trace
              nil
            else
              Anthropic::Internal::Util.deep_merge(*[req[:body], opts[:extra_body]].compact)
            end

          headers, encoded = Anthropic::Internal::Util.encode_content(headers, body)
          {
            method: method,
            url: Anthropic::Internal::Util.join_parsed_uri(@base_url, {**req, path: path, query: query}),
            headers: headers,
            body: encoded,
            max_retries: opts.fetch(:max_retries, @max_retries),
            timeout: timeout
          }
        end

        # @api private
        #
        # @param headers [Hash{String=>String}]
        # @param retry_count [Integer]
        #
        # @return [Float]
        private def retry_delay(headers, retry_count:)
          # Non-standard extension
          span = Float(headers["retry-after-ms"], exception: false)&.then { _1 / 1000 }
          return span if span

          retry_header = headers["retry-after"]
          return span if (span = Float(retry_header, exception: false))

          span = retry_header&.then do
            Time.httpdate(_1) - Time.now
          rescue ArgumentError
            nil
          end
          return span if span

          scale = retry_count**2
          jitter = 1 - (0.25 * rand)
          (@initial_retry_delay * scale * jitter).clamp(0, @max_retry_delay)
        end

        # @api private
        #
        # @param request [Hash{Symbol=>Object}] .
        #
        #   @option request [Symbol] :method
        #
        #   @option request [URI::Generic] :url
        #
        #   @option request [Hash{String=>String}] :headers
        #
        #   @option request [Object] :body
        #
        #   @option request [Integer] :max_retries
        #
        #   @option request [Float] :timeout
        #
        # @param redirect_count [Integer]
        #
        # @param retry_count [Integer]
        #
        # @param send_retry_header [Boolean]
        #
        # @raise [Anthropic::Errors::APIError]
        # @return [Array(Integer, Net::HTTPResponse, Enumerable<String>)]
        private def send_request(request, redirect_count:, retry_count:, send_retry_header:)
          url, headers, max_retries, timeout = request.fetch_values(:url, :headers, :max_retries, :timeout)
          input = {**request.except(:timeout), deadline: Anthropic::Internal::Util.monotonic_secs + timeout}

          if send_retry_header
            headers["x-stainless-retry-count"] = retry_count.to_s
          end

          begin
            status, response, stream = @requester.execute(input)
          rescue Anthropic::Errors::APIConnectionError => e
            status = e
          end

          case status
          in ..299
            [status, response, stream]
          in 300..399 if redirect_count >= self.class::MAX_REDIRECTS
            self.class.reap_connection!(status, stream: stream)

            message = "Failed to complete the request within #{self.class::MAX_REDIRECTS} redirects."
            raise Anthropic::Errors::APIConnectionError.new(url: url, message: message)
          in 300..399
            self.class.reap_connection!(status, stream: stream)

            request = self.class.follow_redirect(request, status: status, response_headers: response)
            send_request(
              request,
              redirect_count: redirect_count + 1,
              retry_count: retry_count,
              send_retry_header: send_retry_header
            )
          in Anthropic::Errors::APIConnectionError if retry_count >= max_retries
            raise status
          in (400..) if retry_count >= max_retries || !self.class.should_retry?(status, headers: response)
            decoded = Kernel.then do
              Anthropic::Internal::Util.decode_content(response, stream: stream, suppress_error: true)
            ensure
              self.class.reap_connection!(status, stream: stream)
            end

            raise Anthropic::Errors::APIStatusError.for(
              url: url,
              status: status,
              body: decoded,
              request: nil,
              response: response
            )
          in (400..) | Anthropic::Errors::APIConnectionError
            self.class.reap_connection!(status, stream: stream)

            delay = retry_delay(response || {}, retry_count: retry_count)
            sleep(delay)

            send_request(
              request,
              redirect_count: redirect_count,
              retry_count: retry_count + 1,
              send_retry_header: send_retry_header
            )
          end
        end

        # Execute the request specified by `req`. This is the method that all resource
        # methods call into.
        #
        # @overload request(method, path, query: {}, headers: {}, body: nil, unwrap: nil, page: nil, stream: nil, model: Anthropic::Internal::Type::Unknown, options: {})
        #
        # @param method [Symbol]
        #
        # @param path [String, Array<String>]
        #
        # @param query [Hash{String=>Array<String>, String, nil}, nil]
        #
        # @param headers [Hash{String=>String, Integer, Array<String, Integer, nil>, nil}, nil]
        #
        # @param body [Object, nil]
        #
        # @param unwrap [Symbol, nil]
        #
        # @param page [Class, nil]
        #
        # @param stream [Class, nil]
        #
        # @param model [Anthropic::Internal::Type::Converter, Class, nil]
        #
        # @param options [Anthropic::RequestOptions, Hash{Symbol=>Object}, nil] .
        #
        #   @option options [String, nil] :idempotency_key
        #
        #   @option options [Hash{String=>Array<String>, String, nil}, nil] :extra_query
        #
        #   @option options [Hash{String=>String, nil}, nil] :extra_headers
        #
        #   @option options [Object, nil] :extra_body
        #
        #   @option options [Integer, nil] :max_retries
        #
        #   @option options [Float, nil] :timeout
        #
        # @raise [Anthropic::Errors::APIError]
        # @return [Object]
        def request(req)
          self.class.validate!(req)
          model = req.fetch(:model) { Anthropic::Internal::Type::Unknown }
          opts = req[:options].to_h
          Anthropic::RequestOptions.validate!(opts)
          request = build_request(req.except(:options), opts)
          url = request.fetch(:url)

          # Don't send the current retry count in the headers if the caller modified the header defaults.
          send_retry_header = request.fetch(:headers)["x-stainless-retry-count"] == "0"
          status, response, stream = send_request(
            request,
            redirect_count: 0,
            retry_count: 0,
            send_retry_header: send_retry_header
          )

          decoded = Anthropic::Internal::Util.decode_content(response, stream: stream)
          case req
          in { stream: Class => st }
            st.new(model: model, url: url, status: status, response: response, stream: decoded)
          in { page: Class => page }
            page.new(client: self, req: req, headers: response, page_data: decoded)
          else
            unwrapped = Anthropic::Internal::Util.dig(decoded, req[:unwrap])
            Anthropic::Internal::Type::Converter.coerce(model, unwrapped)
          end
        end

        # @return [String]
        def inspect
          # rubocop:disable Layout/LineLength
          base_url = Anthropic::Internal::Util.unparse_uri(@base_url)
          "#<#{self.class.name}:0x#{object_id.to_s(16)} base_url=#{base_url} max_retries=#{@max_retries} timeout=#{@timeout}>"
          # rubocop:enable Layout/LineLength
        end
      end
    end
  end
end
