in elasticapm/contrib/serverless/aws.py [0:0]
def set_metadata_and_context(self, coldstart: bool) -> None:
"""
Process the metadata and context fields for this request
"""
metadata = {}
cloud_context = {"origin": {"provider": "aws"}}
service_context = {}
message_context = {}
faas = {}
faas["coldstart"] = coldstart
faas["trigger"] = {"type": "other"}
faas["execution"] = self.context.aws_request_id
arn = self.context.invoked_function_arn
if len(arn.split(":")) > 7:
arn = ":".join(arn.split(":")[:7])
faas["id"] = arn
faas["name"] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME")
faas["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION")
if self.source == "api":
faas["trigger"]["type"] = "http"
faas["trigger"]["request_id"] = self.event["requestContext"]["requestId"]
service_context["origin"] = {"name": self.event["requestContext"]["domainName"]}
service_context["origin"]["id"] = self.event["requestContext"]["apiId"]
service_context["origin"]["version"] = self.event.get("version", "1.0")
cloud_context["origin"] = {}
cloud_context["origin"]["service"] = {"name": "api gateway"}
if ".lambda-url." in self.event["requestContext"]["domainName"]:
cloud_context["origin"]["service"]["name"] = "lambda url"
cloud_context["origin"]["account"] = {"id": self.event["requestContext"]["accountId"]}
cloud_context["origin"]["provider"] = "aws"
elif self.source == "elb":
elb_target_group_arn = self.event["requestContext"]["elb"]["targetGroupArn"]
faas["trigger"]["type"] = "http"
service_context["origin"] = {"name": elb_target_group_arn.split(":")[5].split("/")[1]}
service_context["origin"]["id"] = elb_target_group_arn
cloud_context["origin"] = {}
cloud_context["origin"]["service"] = {"name": "elb"}
cloud_context["origin"]["account"] = {"id": elb_target_group_arn.split(":")[4]}
cloud_context["origin"]["region"] = elb_target_group_arn.split(":")[3]
cloud_context["origin"]["provider"] = "aws"
elif self.source == "sqs":
record = self.event["Records"][0]
faas["trigger"]["type"] = "pubsub"
faas["trigger"]["request_id"] = record["messageId"]
service_context["origin"] = {}
service_context["origin"]["name"] = record["eventSourceARN"].split(":")[5]
service_context["origin"]["id"] = record["eventSourceARN"]
cloud_context["origin"] = {}
cloud_context["origin"]["service"] = {"name": "sqs"}
cloud_context["origin"]["region"] = record["awsRegion"]
cloud_context["origin"]["account"] = {"id": record["eventSourceARN"].split(":")[4]}
cloud_context["origin"]["provider"] = "aws"
message_context["queue"] = {"name": service_context["origin"]["name"]}
if "SentTimestamp" in record["attributes"]:
message_context["age"] = {"ms": int((time.time() * 1000) - int(record["attributes"]["SentTimestamp"]))}
if self.client.config.capture_body in ("transactions", "all") and "body" in record:
message_context["body"] = record["body"]
if self.client.config.capture_headers and record.get("messageAttributes"):
headers = {}
for k, v in record["messageAttributes"].items():
if v and v.get("stringValue"):
headers[k] = v.get("stringValue")
if headers:
message_context["headers"] = headers
elif self.source == "sns":
record = self.event["Records"][0]
faas["trigger"]["type"] = "pubsub"
faas["trigger"]["request_id"] = record["Sns"]["TopicArn"]
service_context["origin"] = {}
service_context["origin"]["name"] = record["Sns"]["TopicArn"].split(":")[5]
service_context["origin"]["id"] = record["Sns"]["TopicArn"]
service_context["origin"]["version"] = record["EventVersion"]
service_context["origin"]["service"] = {"name": "sns"}
cloud_context["origin"] = {}
cloud_context["origin"]["region"] = record["Sns"]["TopicArn"].split(":")[3]
cloud_context["origin"]["account"] = {"id": record["Sns"]["TopicArn"].split(":")[4]}
cloud_context["origin"]["provider"] = "aws"
message_context["queue"] = {"name": service_context["origin"]["name"]}
if "Timestamp" in record["Sns"]:
message_context["age"] = {
"ms": int(
(
datetime.datetime.now()
- datetime.datetime.strptime(record["Sns"]["Timestamp"], r"%Y-%m-%dT%H:%M:%S.%fZ")
).total_seconds()
* 1000
)
}
if self.client.config.capture_body in ("transactions", "all") and "Message" in record["Sns"]:
message_context["body"] = record["Sns"]["Message"]
if self.client.config.capture_headers and record["Sns"].get("MessageAttributes"):
headers = {}
for k, v in record["Sns"]["MessageAttributes"].items():
if v and v.get("Type") == "String":
headers[k] = v.get("Value")
if headers:
message_context["headers"] = headers
elif self.source == "s3":
record = self.event["Records"][0]
faas["trigger"]["type"] = "datasource"
faas["trigger"]["request_id"] = record["responseElements"]["x-amz-request-id"]
service_context["origin"] = {}
service_context["origin"]["name"] = record["s3"]["bucket"]["name"]
service_context["origin"]["id"] = record["s3"]["bucket"]["arn"]
service_context["origin"]["version"] = record["eventVersion"]
cloud_context["origin"] = {}
cloud_context["origin"]["service"] = {"name": "s3"}
cloud_context["origin"]["region"] = record["awsRegion"]
cloud_context["origin"]["provider"] = "aws"
metadata["service"] = {}
metadata["service"]["name"] = self.client.config.service_name
metadata["service"]["framework"] = {"name": "AWS Lambda"}
metadata["service"]["runtime"] = {
"name": os.environ.get("AWS_EXECUTION_ENV"),
"version": platform.python_version(),
}
metadata["service"]["version"] = self.client.config.service_version
metadata["service"]["node"] = {"configured_name": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")}
# This is the one piece of metadata that requires deep merging. We add it manually
# here to avoid having to deep merge in _transport.add_metadata()
node = nested_key(self.client.get_service_info(), "node")
if node:
metadata["service"]["node"] = node
metadata["cloud"] = {}
metadata["cloud"]["provider"] = "aws"
metadata["cloud"]["region"] = os.environ.get("AWS_REGION")
metadata["cloud"]["service"] = {"name": "lambda"}
metadata["cloud"]["account"] = {"id": arn.split(":")[4]}
elasticapm.set_context(cloud_context, "cloud")
elasticapm.set_context(service_context, "service")
# faas doesn't actually belong in context, but we handle this in to_dict
elasticapm.set_context(faas, "faas")
if message_context:
elasticapm.set_context(message_context, "message")
self.client.add_extra_metadata(metadata)