aws_xray_sdk/ext/httpx/patch.py (54 lines of code) (raw):
import httpx
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.models import http
from aws_xray_sdk.ext.util import inject_trace_header, get_hostname
def patch():
httpx.Client = _InstrumentedClient
httpx.AsyncClient = _InstrumentedAsyncClient
httpx._api.Client = _InstrumentedClient
class _InstrumentedClient(httpx.Client):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._original_transport = self._transport
self._transport = SyncInstrumentedTransport(self._transport)
class _InstrumentedAsyncClient(httpx.AsyncClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._original_transport = self._transport
self._transport = AsyncInstrumentedTransport(self._transport)
class SyncInstrumentedTransport(httpx.BaseTransport):
def __init__(self, transport: httpx.BaseTransport):
self._wrapped_transport = transport
def handle_request(self, request: httpx.Request) -> httpx.Response:
with xray_recorder.in_subsegment(
get_hostname(str(request.url)), namespace="remote"
) as subsegment:
if subsegment is not None:
subsegment.put_http_meta(http.METHOD, request.method)
subsegment.put_http_meta(
http.URL,
str(request.url.copy_with(password=None, query=None, fragment=None)),
)
inject_trace_header(request.headers, subsegment)
response = self._wrapped_transport.handle_request(request)
if subsegment is not None:
subsegment.put_http_meta(http.STATUS, response.status_code)
return response
class AsyncInstrumentedTransport(httpx.AsyncBaseTransport):
def __init__(self, transport: httpx.AsyncBaseTransport):
self._wrapped_transport = transport
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
async with xray_recorder.in_subsegment_async(
get_hostname(str(request.url)), namespace="remote"
) as subsegment:
if subsegment is not None:
subsegment.put_http_meta(http.METHOD, request.method)
subsegment.put_http_meta(
http.URL,
str(request.url.copy_with(password=None, query=None, fragment=None)),
)
inject_trace_header(request.headers, subsegment)
response = await self._wrapped_transport.handle_async_request(request)
if subsegment is not None:
subsegment.put_http_meta(http.STATUS, response.status_code)
return response