def set_metadata_and_context()

in elasticapm/contrib/serverless/aws.py [0:0]


    def set_metadata_and_context(self, coldstart: bool) -> None:
        """
        Process the metadata and context fields for this request
        """
        metadata = {}
        cloud_context = {"origin": {"provider": "aws"}}
        service_context = {}
        message_context = {}

        faas = {}
        faas["coldstart"] = coldstart
        faas["trigger"] = {"type": "other"}
        faas["execution"] = self.context.aws_request_id
        arn = self.context.invoked_function_arn
        if len(arn.split(":")) > 7:
            arn = ":".join(arn.split(":")[:7])
        faas["id"] = arn
        faas["name"] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME")
        faas["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION")

        if self.source == "api":
            faas["trigger"]["type"] = "http"
            faas["trigger"]["request_id"] = self.event["requestContext"]["requestId"]
            service_context["origin"] = {"name": self.event["requestContext"]["domainName"]}
            service_context["origin"]["id"] = self.event["requestContext"]["apiId"]
            service_context["origin"]["version"] = self.event.get("version", "1.0")
            cloud_context["origin"] = {}
            cloud_context["origin"]["service"] = {"name": "api gateway"}
            if ".lambda-url." in self.event["requestContext"]["domainName"]:
                cloud_context["origin"]["service"]["name"] = "lambda url"
            cloud_context["origin"]["account"] = {"id": self.event["requestContext"]["accountId"]}
            cloud_context["origin"]["provider"] = "aws"
        elif self.source == "elb":
            elb_target_group_arn = self.event["requestContext"]["elb"]["targetGroupArn"]
            faas["trigger"]["type"] = "http"
            service_context["origin"] = {"name": elb_target_group_arn.split(":")[5].split("/")[1]}
            service_context["origin"]["id"] = elb_target_group_arn
            cloud_context["origin"] = {}
            cloud_context["origin"]["service"] = {"name": "elb"}
            cloud_context["origin"]["account"] = {"id": elb_target_group_arn.split(":")[4]}
            cloud_context["origin"]["region"] = elb_target_group_arn.split(":")[3]
            cloud_context["origin"]["provider"] = "aws"
        elif self.source == "sqs":
            record = self.event["Records"][0]
            faas["trigger"]["type"] = "pubsub"
            faas["trigger"]["request_id"] = record["messageId"]
            service_context["origin"] = {}
            service_context["origin"]["name"] = record["eventSourceARN"].split(":")[5]
            service_context["origin"]["id"] = record["eventSourceARN"]
            cloud_context["origin"] = {}
            cloud_context["origin"]["service"] = {"name": "sqs"}
            cloud_context["origin"]["region"] = record["awsRegion"]
            cloud_context["origin"]["account"] = {"id": record["eventSourceARN"].split(":")[4]}
            cloud_context["origin"]["provider"] = "aws"
            message_context["queue"] = {"name": service_context["origin"]["name"]}
            if "SentTimestamp" in record["attributes"]:
                message_context["age"] = {"ms": int((time.time() * 1000) - int(record["attributes"]["SentTimestamp"]))}
            if self.client.config.capture_body in ("transactions", "all") and "body" in record:
                message_context["body"] = record["body"]
            if self.client.config.capture_headers and record.get("messageAttributes"):
                headers = {}
                for k, v in record["messageAttributes"].items():
                    if v and v.get("stringValue"):
                        headers[k] = v.get("stringValue")
                if headers:
                    message_context["headers"] = headers
        elif self.source == "sns":
            record = self.event["Records"][0]
            faas["trigger"]["type"] = "pubsub"
            faas["trigger"]["request_id"] = record["Sns"]["TopicArn"]
            service_context["origin"] = {}
            service_context["origin"]["name"] = record["Sns"]["TopicArn"].split(":")[5]
            service_context["origin"]["id"] = record["Sns"]["TopicArn"]
            service_context["origin"]["version"] = record["EventVersion"]
            service_context["origin"]["service"] = {"name": "sns"}
            cloud_context["origin"] = {}
            cloud_context["origin"]["region"] = record["Sns"]["TopicArn"].split(":")[3]
            cloud_context["origin"]["account"] = {"id": record["Sns"]["TopicArn"].split(":")[4]}
            cloud_context["origin"]["provider"] = "aws"
            message_context["queue"] = {"name": service_context["origin"]["name"]}
            if "Timestamp" in record["Sns"]:
                message_context["age"] = {
                    "ms": int(
                        (
                            datetime.datetime.now()
                            - datetime.datetime.strptime(record["Sns"]["Timestamp"], r"%Y-%m-%dT%H:%M:%S.%fZ")
                        ).total_seconds()
                        * 1000
                    )
                }
            if self.client.config.capture_body in ("transactions", "all") and "Message" in record["Sns"]:
                message_context["body"] = record["Sns"]["Message"]
            if self.client.config.capture_headers and record["Sns"].get("MessageAttributes"):
                headers = {}
                for k, v in record["Sns"]["MessageAttributes"].items():
                    if v and v.get("Type") == "String":
                        headers[k] = v.get("Value")
                if headers:
                    message_context["headers"] = headers
        elif self.source == "s3":
            record = self.event["Records"][0]
            faas["trigger"]["type"] = "datasource"
            faas["trigger"]["request_id"] = record["responseElements"]["x-amz-request-id"]
            service_context["origin"] = {}
            service_context["origin"]["name"] = record["s3"]["bucket"]["name"]
            service_context["origin"]["id"] = record["s3"]["bucket"]["arn"]
            service_context["origin"]["version"] = record["eventVersion"]
            cloud_context["origin"] = {}
            cloud_context["origin"]["service"] = {"name": "s3"}
            cloud_context["origin"]["region"] = record["awsRegion"]
            cloud_context["origin"]["provider"] = "aws"

        metadata["service"] = {}
        metadata["service"]["name"] = self.client.config.service_name
        metadata["service"]["framework"] = {"name": "AWS Lambda"}
        metadata["service"]["runtime"] = {
            "name": os.environ.get("AWS_EXECUTION_ENV"),
            "version": platform.python_version(),
        }
        metadata["service"]["version"] = self.client.config.service_version
        metadata["service"]["node"] = {"configured_name": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")}
        # This is the one piece of metadata that requires deep merging. We add it manually
        # here to avoid having to deep merge in _transport.add_metadata()
        node = nested_key(self.client.get_service_info(), "node")
        if node:
            metadata["service"]["node"] = node

        metadata["cloud"] = {}
        metadata["cloud"]["provider"] = "aws"
        metadata["cloud"]["region"] = os.environ.get("AWS_REGION")
        metadata["cloud"]["service"] = {"name": "lambda"}
        metadata["cloud"]["account"] = {"id": arn.split(":")[4]}

        elasticapm.set_context(cloud_context, "cloud")
        elasticapm.set_context(service_context, "service")
        # faas doesn't actually belong in context, but we handle this in to_dict
        elasticapm.set_context(faas, "faas")
        if message_context:
            elasticapm.set_context(message_context, "message")
        self.client.add_extra_metadata(metadata)