elasticapm/instrumentation/packages/botocore.py (191 lines of code) (raw):

# BSD 3-Clause License # # Copyright (c) 2019, Elasticsearch BV # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import urllib.parse from collections import namedtuple from elasticapm.conf import constants from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule from elasticapm.traces import SpanType, capture_span, execution_context from elasticapm.utils.disttracing import TraceParent from elasticapm.utils.logging import get_logger logger = get_logger("elasticapm.instrument") SQS_MAX_ATTRIBUTES = 10 HandlerInfo = namedtuple("HandlerInfo", ("signature", "span_type", "span_subtype", "span_action", "context")) # Used for boto3 < 1.7 endpoint_to_service_id = {"SNS": "SNS", "S3": "S3", "DYNAMODB": "DynamoDB", "SQS": "SQS"} class BotocoreInstrumentation(AbstractInstrumentedModule): name = "botocore" instrument_list = [("botocore.client", "BaseClient._make_api_call")] capture_span_ctx = capture_span def _call(self, service, instance, args, kwargs): """ This is split out from `call()` so that it can be re-used by the aiobotocore instrumentation without duplicating all of this code. """ operation_name = kwargs.get("operation_name", args[0]) parsed_url = urllib.parse.urlparse(instance.meta.endpoint_url) context = { "destination": { "address": parsed_url.hostname, "port": parsed_url.port, "cloud": {"region": instance.meta.region_name}, } } handler_info = None handler = handlers.get(service, False) if handler: handler_info = handler(operation_name, service, instance, args, kwargs, context) if not handler_info: handler_info = handle_default(operation_name, service, instance, args, kwargs, context) return self.capture_span_ctx( handler_info.signature, span_type=handler_info.span_type, leaf=True, span_subtype=handler_info.span_subtype, span_action=handler_info.span_action, extra=handler_info.context, ) def _get_service(self, instance): service_model = instance.meta.service_model if hasattr(service_model, "service_id"): # added in boto3 1.7 service = service_model.service_id else: service = service_model.service_name.upper() service = endpoint_to_service_id.get(service, service) return service def call(self, module, method, wrapped, instance, args, kwargs): service = self._get_service(instance) ctx = self._call(service, instance, args, kwargs) with ctx as span: if service in pre_span_modifiers: pre_span_modifiers[service](span, args, kwargs) result = wrapped(*args, **kwargs) if service in post_span_modifiers: post_span_modifiers[service](span, args, kwargs, result) request_id = result.get("ResponseMetadata", {}).get("RequestId") if request_id: span.update_context("http", {"request": {"id": request_id}}) return result def handle_s3(operation_name, service, instance, args, kwargs, context): span_type = "storage" span_subtype = "s3" span_action = operation_name if len(args) > 1: bucket = args[1].get("Bucket", "") key = args[1].get("Key", "") else: # TODO handle Access Points bucket = "" key = "" if bucket or key: context["otel_attributes"] = {} if bucket: context["otel_attributes"]["aws.s3.bucket"] = bucket if key: context["otel_attributes"]["aws.s3.key"] = key signature = f"S3 {operation_name} {bucket}" context["destination"]["service"] = {"name": span_subtype, "resource": f"s3/{bucket}", "type": span_type} return HandlerInfo(signature, span_type, span_subtype, span_action, context) def handle_dynamodb(operation_name, service, instance, args, kwargs, context): span_type = "db" span_subtype = "dynamodb" span_action = "query" if len(args) > 1 and "TableName" in args[1]: table = args[1]["TableName"] else: table = "" signature = f"DynamoDB {operation_name} {table}".rstrip() context["db"] = {"type": "dynamodb", "instance": instance.meta.region_name} if operation_name == "Query" and len(args) > 1 and "KeyConditionExpression" in args[1]: context["db"]["statement"] = args[1]["KeyConditionExpression"] context["destination"]["service"] = {"name": span_subtype, "resource": table, "type": span_type} return HandlerInfo(signature, span_type, span_subtype, span_action, context) def handle_sns(operation_name, service, instance, args, kwargs, context): if operation_name != "Publish": # only "publish" is handled specifically, other endpoints get the default treatment return False span_type = "messaging" span_subtype = "sns" span_action = "send" topic_name = "" if len(args) > 1: if "Name" in args[1]: topic_name = args[1]["Name"] if "TopicArn" in args[1]: topic_name = args[1]["TopicArn"].rsplit(":", maxsplit=1)[-1] signature = f"SNS {operation_name} {topic_name}".rstrip() context["destination"]["service"] = { "name": span_subtype, "resource": f"{span_subtype}/{topic_name}" if topic_name else span_subtype, "type": span_type, } return HandlerInfo(signature, span_type, span_subtype, span_action, context) SQS_OPERATIONS = { "SendMessage": {"span_action": "send", "signature": "SEND to"}, "SendMessageBatch": {"span_action": "send_batch", "signature": "SEND_BATCH to"}, "ReceiveMessage": {"span_action": "receive", "signature": "RECEIVE from"}, "DeleteMessage": {"span_action": "delete", "signature": "DELETE from"}, "DeleteMessageBatch": {"span_action": "delete_batch", "signature": "DELETE_BATCH from"}, } def handle_sqs(operation_name, service, instance, args, kwargs, context): op = SQS_OPERATIONS.get(operation_name, None) if not op: # only "publish" is handled specifically, other endpoints get the default treatment return False span_type = "messaging" span_subtype = "sqs" topic_name = "" if len(args) > 1: topic_name = args[1]["QueueUrl"].rsplit("/", maxsplit=1)[-1] signature = f"SQS {op['signature']} {topic_name}".rstrip() if topic_name else f"SQS {op['signature']}" context["destination"]["service"] = { "name": span_subtype, "resource": f"{span_subtype}/{topic_name}" if topic_name else span_subtype, "type": span_type, } return HandlerInfo(signature, span_type, span_subtype, op["span_action"], context) def modify_span_sqs_pre(span, args, kwargs) -> None: operation_name = kwargs.get("operation_name", args[0]) if span.id: trace_parent = span.transaction.trace_parent.copy_from(span_id=span.id) else: # this is a dropped span, use transaction id instead transaction = execution_context.get_transaction() trace_parent = transaction.trace_parent.copy_from(span_id=transaction.id) attributes = {constants.TRACEPARENT_HEADER_NAME: {"DataType": "String", "StringValue": trace_parent.to_string()}} if trace_parent.tracestate: attributes[constants.TRACESTATE_HEADER_NAME] = {"DataType": "String", "StringValue": trace_parent.tracestate} if len(args) > 1: if operation_name in ("SendMessage", "SendMessageBatch"): attributes_count = len(attributes) if operation_name == "SendMessage": messages = [args[1]] else: messages = args[1]["Entries"] for message in messages: message["MessageAttributes"] = message.get("MessageAttributes") or {} if len(message["MessageAttributes"]) + attributes_count <= SQS_MAX_ATTRIBUTES: message["MessageAttributes"].update(attributes) else: logger.info("Not adding disttracing headers to message due to attribute limit reached") elif operation_name == "ReceiveMessage": message_attributes = args[1].setdefault("MessageAttributeNames", []) if "All" not in message_attributes: message_attributes.extend([constants.TRACEPARENT_HEADER_NAME, constants.TRACESTATE_HEADER_NAME]) def modify_span_sqs_post(span: SpanType, args, kwargs, result) -> None: operation_name = kwargs.get("operation_name", args[0]) if operation_name == "ReceiveMessage" and "Messages" in result: for message in result["Messages"][:1000]: # only up to 1000 span links are recorded if "MessageAttributes" in message and constants.TRACEPARENT_HEADER_NAME in message["MessageAttributes"]: tp = TraceParent.from_string( message["MessageAttributes"][constants.TRACEPARENT_HEADER_NAME]["StringValue"] ) span.add_link(tp) def handle_default(operation_name, service, instance, args, kwargs, context): span_type = "aws" span_subtype = service.lower() span_action = operation_name context["destination"]["service"] = {"name": span_subtype, "resource": span_subtype, "type": span_type} signature = f"{service}:{operation_name}" return HandlerInfo(signature, span_type, span_subtype, span_action, context) handlers = { "S3": handle_s3, "DynamoDB": handle_dynamodb, "SNS": handle_sns, "SQS": handle_sqs, "default": handle_default, } pre_span_modifiers = { "SQS": modify_span_sqs_pre, } post_span_modifiers = { "SQS": modify_span_sqs_post, }