in elasticapm/contrib/serverless/aws.py [0:0]
def get_data_from_request(event: dict, capture_body: bool = False, capture_headers: bool = True) -> dict:
"""
Capture context data from API gateway event
"""
result = {}
if capture_headers and "headers" in event:
result["headers"] = event["headers"]
method = (
nested_key(event, "requestContext", "httpMethod")
or nested_key(event, "requestContext", "http", "method")
or nested_key(event, "httpMethod")
)
if not method:
# Not API Gateway
return result
result["method"] = method
if method in constants.HTTP_WITH_BODY and "body" in event:
body = event["body"]
if capture_body:
if event.get("isBase64Encoded"):
body = base64.b64decode(body)
else:
try:
jsonbody = json.loads(body)
body = jsonbody
except Exception:
pass
if body is not None:
result["body"] = body if capture_body else "[REDACTED]"
result["url"] = get_url_dict(event)
return result