elasticapm/instrumentation/packages/base.py (97 lines of code) (raw):

# BSD 3-Clause License # # Copyright (c) 2019, Elasticsearch BV # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import functools import os import wrapt from elasticapm.traces import execution_context from elasticapm.utils.logging import get_logger logger = get_logger("elasticapm.instrument") class ElasticAPMFunctionWrapper(wrapt.FunctionWrapper): # used to differentiate between our own function wrappers and 1st/3rd party wrappers pass class AbstractInstrumentedModule(object): """ This class is designed to reduce the amount of code required to instrument library functions using wrapt. Instrumentation modules inherit from this class and override pieces as needed. Only `name`, `instrumented_list`, and `call` are required in the inheriting class. The `instrument_list` is a list of (module, method) pairs that will be instrumented. The module/method need not be imported -- in fact, because instrumentation modules are all processed during the instrumentation process, lazy imports should be used in order to avoid ImportError exceptions. The `instrument()` method will be called for each InstrumentedModule listed in the instrument register (elasticapm.instrumentation.register), and each method in the `instrument_list` will be wrapped (using wrapt) with the `call_if_sampling()` function, which (by default) will either call the wrapped function by itself, or pass it into `call()` to be called if there is a transaction active. For simple span-wrapping of instrumented libraries, a very simple InstrumentedModule might look like this:: from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule from elasticapm.traces import capture_span class Jinja2Instrumentation(AbstractInstrumentedModule): name = "jinja2" instrument_list = [("jinja2", "Template.render")] def call(self, module, method, wrapped, instance, args, kwargs): signature = instance.name or instance.filename with capture_span(signature, span_type="template", span_subtype="jinja2", span_action="render"): return wrapped(*args, **kwargs) This class can also be used to instrument callables which are expected to create their own transactions (rather than spans within a transaction). In this case, set `creates_transaction = True` next to your `name` and `instrument_list`. This tells the instrumenting code to always wrap the method with `call()`, even if there is no transaction active. It is expected in this case that a new transaction will be created as part of your `call()` method. """ name = None mutates_unsampled_arguments = False creates_transactions = False instrument_list = [ # List of (module, method) pairs to instrument. E.g.: # ("requests.sessions", "Session.send"), ] def __init__(self) -> None: self.originals = {} self.instrumented = False assert self.name is not None def get_wrapped_name(self, wrapped, instance, fallback_method=None): wrapped_name = [] if hasattr(instance, "__class__") and hasattr(instance.__class__, "__name__"): wrapped_name.append(instance.__class__.__name__) if hasattr(wrapped, "__name__"): wrapped_name.append(wrapped.__name__) elif fallback_method: attribute = fallback_method.split(".") if len(attribute) == 2: wrapped_name.append(attribute[1]) return ".".join(wrapped_name) def get_instrument_list(self): return self.instrument_list def instrument(self) -> None: if self.instrumented: return skip_env_var = "SKIP_INSTRUMENT_" + str(self.name.upper()) if skip_env_var in os.environ: logger.debug("Skipping instrumentation of %s. %s is set.", self.name, skip_env_var) return try: instrument_list = self.get_instrument_list() skipped_modules = set() instrumented_methods = [] for module, method in instrument_list: # Skip modules we already failed to load if module in skipped_modules: continue try: parent, attribute, original = wrapt.resolve_path(module, method) except ImportError: # Could not import module logger.debug("Skipping instrumentation of %s. Module %s not found", self.name, module) # Keep track of modules we couldn't load, so we don't try to instrument anything in that module # again skipped_modules.add(module) continue except AttributeError as ex: # Could not find thing in module logger.debug("Skipping instrumentation of %s.%s: %s", module, method, ex) continue except Exception as ex: # Another error occurred while importing the module. logger.debug("Skipping instrumentation of %s.%s due to unknown error: %s", module, method, ex) continue if isinstance(original, ElasticAPMFunctionWrapper): logger.debug("%s.%s already instrumented, skipping", module, method) continue self.originals[(module, method)] = original wrapper = ElasticAPMFunctionWrapper(original, functools.partial(self.call_if_sampling, module, method)) wrapt.apply_patch(parent, attribute, wrapper) instrumented_methods.append((module, method)) if instrumented_methods: logger.debug("Instrumented %s, %s", self.name, ", ".join(".".join(m) for m in instrumented_methods)) except ImportError as ex: logger.debug("Skipping instrumentation of %s. %s", self.name, ex) self.instrumented = True def uninstrument(self) -> None: if not self.instrumented or not self.originals: return uninstrumented_methods = [] for module, method in self.get_instrument_list(): if (module, method) in self.originals: parent, attribute, wrapper = wrapt.resolve_path(module, method) wrapt.apply_patch(parent, attribute, self.originals[(module, method)]) uninstrumented_methods.append((module, method)) if uninstrumented_methods: logger.debug("Uninstrumented %s, %s", self.name, ", ".join(".".join(m) for m in uninstrumented_methods)) self.instrumented = False self.originals = {} def call_if_sampling(self, module, method, wrapped, instance, args, kwargs): """ This is the function which will wrap the instrumented method/function. By default, will call the instrumented method/function, via `call()`, only if a transaction is active and sampled. This behavior can be overridden by setting `creates_transactions = True` at the class level. If `creates_transactions == False` and there's an active transaction with `transaction.is_sampled == False`, then the `mutate_unsampled_call_args()` method is called, and the resulting args and kwargs are passed into the wrapped function directly, not via `call()`. This can e.g. be used to add traceparent headers to the underlying http call for HTTP instrumentations, even if we're not sampling the transaction. """ if self.creates_transactions: return self.call(module, method, wrapped, instance, args, kwargs) transaction = execution_context.get_transaction() if not transaction: return wrapped(*args, **kwargs) elif not transaction.is_sampled or transaction.pause_sampling: args, kwargs = self.mutate_unsampled_call_args(module, method, wrapped, instance, args, kwargs, transaction) return wrapped(*args, **kwargs) else: return self.call(module, method, wrapped, instance, args, kwargs) def mutate_unsampled_call_args(self, module, method, wrapped, instance, args, kwargs, transaction): """ Method called for unsampled wrapped calls. This can e.g. be used to add traceparent headers to the underlying http call for HTTP instrumentations. :param module: :param method: :param wrapped: :param instance: :param args: :param kwargs: :param transaction: :return: """ return args, kwargs def call(self, module, method, wrapped, instance, args, kwargs): """ Wrapped call. This method should gather all necessary data, then call `wrapped` in a `capture_span` context manager. Note that by default this wrapper will only be used if a transaction is currently active. If you want the ability to create a transaction in your `call()` method, set `create_transactions = True` at the class level. :param module: Name of the wrapped module :param method: Name of the wrapped method/function :param wrapped: the wrapped method/function object :param instance: the wrapped instance :param args: arguments to the wrapped method/function :param kwargs: keyword arguments to the wrapped method/function :return: the result of calling the wrapped method/function """ raise NotImplementedError