aws_xray_sdk/ext/pynamodb/patch.py (62 lines of code) (raw):
import json
import wrapt
import pynamodb
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.models import http
from aws_xray_sdk.ext.boto_utils import _extract_whitelisted_params
PYNAMODB4 = int(pynamodb.__version__.split('.')[0]) >= 4
if PYNAMODB4:
import botocore.httpsession
else:
import botocore.vendored.requests.sessions
def patch():
"""Patch PynamoDB so it generates subsegements when calling DynamoDB."""
if PYNAMODB4:
if hasattr(botocore.httpsession, '_xray_enabled'):
return
setattr(botocore.httpsession, '_xray_enabled', True)
module = 'botocore.httpsession'
name = 'URLLib3Session.send'
else:
if hasattr(botocore.vendored.requests.sessions, '_xray_enabled'):
return
setattr(botocore.vendored.requests.sessions, '_xray_enabled', True)
module = 'botocore.vendored.requests.sessions'
name = 'Session.send'
wrapt.wrap_function_wrapper(
module, name, _xray_traced_pynamodb,
)
def _xray_traced_pynamodb(wrapped, instance, args, kwargs):
# Check if it's a request to DynamoDB and return otherwise.
try:
service = args[0].headers['X-Amz-Target'].decode('utf-8').split('_')[0]
except KeyError:
return wrapped(*args, **kwargs)
if service.lower() != 'dynamodb':
return wrapped(*args, **kwargs)
return xray_recorder.record_subsegment(
wrapped, instance, args, kwargs,
name='dynamodb',
namespace='aws',
meta_processor=pynamodb_meta_processor,
)
def pynamodb_meta_processor(wrapped, instance, args, kwargs, return_value,
exception, subsegment, stack):
operation_name = args[0].headers['X-Amz-Target'].decode('utf-8').split('.')[1]
region = args[0].url.split('.')[1]
aws_meta = {
'operation': operation_name,
'region': region
}
# in case of client timeout the return value will be empty
if return_value is not None:
aws_meta['request_id'] = return_value.headers.get('x-amzn-RequestId')
subsegment.put_http_meta(http.STATUS, return_value.status_code)
if exception:
subsegment.add_error_flag()
subsegment.add_exception(exception, stack, True)
if PYNAMODB4:
resp = json.loads(return_value.text) if return_value else None
else:
resp = return_value.json() if return_value else None
_extract_whitelisted_params(subsegment.name, operation_name, aws_meta,
[None, json.loads(args[0].body.decode('utf-8'))],
None, resp)
subsegment.set_aws(aws_meta)