in src/anthropic/_base_client.py [0:0]
def __init__(self, **kwargs: Any) -> None:
kwargs.setdefault("timeout", DEFAULT_TIMEOUT)
kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS)
kwargs.setdefault("follow_redirects", True)
if "transport" not in kwargs:
socket_options: List[Tuple[int, int, Union[int, bool]]] = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)]
TCP_KEEPINTVL = getattr(socket, "TCP_KEEPINTVL", None)
if TCP_KEEPINTVL is not None:
socket_options.append((socket.IPPROTO_TCP, TCP_KEEPINTVL, 60))
elif sys.platform == "darwin":
TCP_KEEPALIVE = getattr(socket, "TCP_KEEPALIVE", 0x10)
socket_options.append((socket.IPPROTO_TCP, TCP_KEEPALIVE, 60))
TCP_KEEPCNT = getattr(socket, "TCP_KEEPCNT", None)
if TCP_KEEPCNT is not None:
socket_options.append((socket.IPPROTO_TCP, TCP_KEEPCNT, 5))
TCP_KEEPIDLE = getattr(socket, "TCP_KEEPIDLE", None)
if TCP_KEEPIDLE is not None:
socket_options.append((socket.IPPROTO_TCP, TCP_KEEPIDLE, 60))
kwargs["transport"] = httpx.HTTPTransport(
# note: limits is always set above
limits=kwargs["limits"],
socket_options=socket_options,
)
super().__init__(**kwargs)