def get_data_from_request()

in elasticapm/contrib/serverless/aws.py [0:0]


def get_data_from_request(event: dict, capture_body: bool = False, capture_headers: bool = True) -> dict:
    """
    Capture context data from API gateway event
    """
    result = {}
    if capture_headers and "headers" in event:
        result["headers"] = event["headers"]

    method = (
        nested_key(event, "requestContext", "httpMethod")
        or nested_key(event, "requestContext", "http", "method")
        or nested_key(event, "httpMethod")
    )

    if not method:
        # Not API Gateway
        return result

    result["method"] = method
    if method in constants.HTTP_WITH_BODY and "body" in event:
        body = event["body"]
        if capture_body:
            if event.get("isBase64Encoded"):
                body = base64.b64decode(body)
            else:
                try:
                    jsonbody = json.loads(body)
                    body = jsonbody
                except Exception:
                    pass

        if body is not None:
            result["body"] = body if capture_body else "[REDACTED]"

    result["url"] = get_url_dict(event)
    return result