elasticapm/traces.py (907 lines of code) (raw):
# BSD 3-Clause License
#
# Copyright (c) 2019, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import functools
import random
import re
import threading
import time
import timeit
import urllib.parse
import warnings
from collections import defaultdict
from datetime import timedelta
from types import TracebackType
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union
import elasticapm
from elasticapm.conf import constants
from elasticapm.conf.constants import LABEL_RE, SPAN, TRANSACTION
from elasticapm.context import init_execution_context
from elasticapm.metrics.base_metrics import Timer
from elasticapm.utils import encoding, get_name_from_func, nested_key, url_to_destination_resource
from elasticapm.utils.disttracing import TraceParent
from elasticapm.utils.logging import get_logger
from elasticapm.utils.time import time_to_perf_counter
__all__ = ("capture_span", "label", "set_transaction_name", "set_custom_context", "set_user_context")
error_logger = get_logger("elasticapm.errors")
logger = get_logger("elasticapm.traces")
_time_func = timeit.default_timer
execution_context = init_execution_context()
SpanType = Union["Span", "DroppedSpan"]
FuncType = Callable[..., Any]
_AnnotatedFunctionT = TypeVar("_AnnotatedFunctionT", bound=FuncType)
class ChildDuration(object):
__slots__ = ("obj", "_nesting_level", "_start", "_duration", "_lock")
def __init__(self, obj: "BaseSpan") -> None:
self.obj = obj
self._nesting_level: int = 0
self._start: float = 0
self._duration: timedelta = timedelta(seconds=0)
self._lock = threading.Lock()
def start(self, timestamp: float) -> None:
with self._lock:
self._nesting_level += 1
if self._nesting_level == 1:
self._start = timestamp
def stop(self, timestamp: float) -> None:
with self._lock:
self._nesting_level -= 1
if self._nesting_level == 0:
self._duration += timedelta(seconds=timestamp - self._start)
@property
def duration(self) -> timedelta:
return self._duration
class BaseSpan(object):
def __init__(self, labels=None, start=None, links: Optional[Sequence[TraceParent]] = None) -> None:
self._child_durations = ChildDuration(self)
self.labels = {}
self.outcome: Optional[str] = None
self.compression_buffer: Optional[Union[Span, DroppedSpan]] = None
self.compression_buffer_lock = threading.Lock()
self.start_time: float = time_to_perf_counter(start) if start is not None else _time_func()
self.ended_time: Optional[float] = None
self.duration: Optional[timedelta] = None
self.links: Optional[List[Dict[str, str]]] = None
if links:
for trace_parent in links:
self.add_link(trace_parent)
if labels:
self.label(**labels)
def child_started(self, timestamp) -> None:
self._child_durations.start(timestamp)
def child_ended(self, child: SpanType) -> None:
with self.compression_buffer_lock:
if not child.is_compression_eligible():
if self.compression_buffer:
self.compression_buffer.report()
self.compression_buffer = None
child.report()
elif self.compression_buffer is None:
self.compression_buffer = child
elif not self.compression_buffer.try_to_compress(child):
self.compression_buffer.report()
self.compression_buffer = child
def end(self, skip_frames: int = 0, duration: Optional[timedelta] = None) -> None:
self.ended_time = _time_func()
self.duration = duration if duration is not None else timedelta(seconds=self.ended_time - self.start_time)
if self.compression_buffer:
self.compression_buffer.report()
self.compression_buffer = None
def to_dict(self) -> dict:
raise NotImplementedError()
def label(self, **labels) -> None:
"""
Label this span with one or multiple key/value labels. Keys should be strings, values can be strings, booleans,
or numerical values (int, float, Decimal)
span_obj.label(key1="value1", key2=True, key3=42)
Note that keys will be dedotted, replacing dot (.), star (*) and double quote (") with an underscore (_)
:param labels: key/value pairs of labels
:return: None
"""
labels = encoding.enforce_label_format(labels)
self.labels.update(labels)
def add_link(self, trace_parent: TraceParent) -> None:
"""
Causally link this span/transaction to another span/transaction
"""
if self.links is None:
self.links = []
self.links.append({"trace_id": trace_parent.trace_id, "span_id": trace_parent.span_id})
def set_success(self) -> None:
self.outcome = constants.OUTCOME.SUCCESS
def set_failure(self) -> None:
self.outcome = constants.OUTCOME.FAILURE
@staticmethod
def get_dist_tracing_id() -> str:
return "%016x" % random.getrandbits(64)
@property
def tracer(self) -> "Tracer":
raise NotImplementedError()
class Transaction(BaseSpan):
def __init__(
self,
tracer: "Tracer",
transaction_type: str = "custom",
trace_parent: Optional[TraceParent] = None,
is_sampled: bool = True,
start: Optional[float] = None,
sample_rate: Optional[float] = None,
links: Optional[Sequence[TraceParent]] = None,
) -> None:
"""
tracer
Tracer object
transaction_type
Transaction type
trace_parent
TraceParent object representing the parent trace and trace state
is_sampled
Whether or not this transaction is sampled
start
Optional start timestamp. This is expected to be an epoch timestamp
in seconds (such as from `time.time()`). If it is not, it's recommended
that a `duration` is passed into the `end()` method.
sample_rate
Sample rate which was used to decide whether to sample this transaction.
This is reported to the APM server so that unsampled transactions can
be extrapolated.
links:
A list of traceparents to link this transaction causally
"""
self.id = self.get_dist_tracing_id()
if not trace_parent:
trace_parent = TraceParent.new(self.id, is_sampled)
# While this is set to True, underlying instrumentations will not create
# spans. The same outcome could be reached via `is_sampled`, but
# that requires a user to correctly restore the previous value. This
# setting works independent of the is_sampled setting, and is assumed to
# be False unless a user sets it to True.
self.pause_sampling = False
self.trace_parent: TraceParent = trace_parent
self.timestamp = start if start is not None else time.time()
self.name: Optional[str] = None
self.result: Optional[str] = None
self.transaction_type = transaction_type
self._tracer = tracer
# The otel bridge uses Transactions/Spans interchangeably -- storing
# a reference to the Transaction in the Transaction simplifies things.
self.transaction = self
self.config_span_compression_enabled = tracer.config.span_compression_enabled
self.config_span_compression_exact_match_max_duration = tracer.config.span_compression_exact_match_max_duration
self.config_span_compression_same_kind_max_duration = tracer.config.span_compression_same_kind_max_duration
self.config_exit_span_min_duration = tracer.config.exit_span_min_duration
self.config_transaction_max_spans = tracer.config.transaction_max_spans
self.dropped_spans: int = 0
self.context: Dict[str, Any] = {}
self._is_sampled = is_sampled
self.sample_rate = sample_rate
self._span_counter: int = 0
self._span_timers: Dict[Tuple[str, str], Timer] = defaultdict(Timer)
self._span_timers_lock = threading.Lock()
self._dropped_span_statistics = defaultdict(lambda: {"count": 0, "duration.sum.us": 0})
try:
self._breakdown = self.tracer._agent.metrics.get_metricset(
"elasticapm.metrics.sets.breakdown.BreakdownMetricSet"
)
except (LookupError, AttributeError):
self._breakdown = None
super().__init__(start=start)
if links:
for trace_parent in links:
self.add_link(trace_parent)
def end(self, skip_frames: int = 0, duration: Optional[timedelta] = None) -> None:
super().end(skip_frames, duration)
if self._breakdown:
for (span_type, span_subtype), timer in self._span_timers.items():
labels = {
"span.type": span_type,
"transaction.name": self.name,
"transaction.type": self.transaction_type,
}
if span_subtype:
labels["span.subtype"] = span_subtype
val = timer.val
self._breakdown.timer("span.self_time", reset_on_collect=True, unit="us", **labels).update(
val[0], val[1]
)
if self.is_sampled:
self._breakdown.timer(
"span.self_time",
reset_on_collect=True,
unit="us",
**{"span.type": "app", "transaction.name": self.name, "transaction.type": self.transaction_type},
).update((self.duration - self._child_durations.duration).total_seconds() * 1_000_000)
def _begin_span(
self,
name,
span_type,
context=None,
leaf=False,
labels=None,
parent_span_id=None,
span_subtype=None,
span_action=None,
sync=None,
start=None,
auto_activate=True,
links: Optional[Sequence[TraceParent]] = None,
):
parent_span = execution_context.get_span()
tracer = self.tracer
if parent_span and parent_span.leaf:
span = DroppedSpan(parent_span, leaf=True)
elif self.config_transaction_max_spans and self._span_counter > self.config_transaction_max_spans - 1:
self.dropped_spans += 1
span = DroppedSpan(parent_span, context=context)
else:
span = Span(
transaction=self,
name=name,
span_type=span_type or "code.custom",
context=context,
leaf=leaf,
labels=labels,
parent=parent_span,
parent_span_id=parent_span_id,
span_subtype=span_subtype,
span_action=span_action,
sync=sync,
start=start,
links=links,
)
span.frames = tracer.frames_collector_func()
self._span_counter += 1
if auto_activate:
execution_context.set_span(span)
return span
def begin_span(
self,
name,
span_type,
context=None,
leaf=False,
labels=None,
span_subtype=None,
span_action=None,
sync=None,
start=None,
auto_activate=True,
links: Optional[Sequence[TraceParent]] = None,
):
"""
Begin a new span
:param name: name of the span
:param span_type: type of the span
:param context: a context dict
:param leaf: True if this is a leaf span
:param labels: a flat string/string dict of labels
:param span_subtype: sub type of the span, e.g. "postgresql"
:param span_action: action of the span , e.g. "query"
:param sync: indicate if the span is synchronous or not. In most cases, `None` should be used
:param start: timestamp, mostly useful for testing
:param auto_activate: whether to set this span in execution_context
:param links: an optional list of traceparents to link this span with
:return: the Span object
"""
return self._begin_span(
name,
span_type,
context=context,
leaf=leaf,
labels=labels,
parent_span_id=None,
span_subtype=span_subtype,
span_action=span_action,
sync=sync,
start=start,
auto_activate=auto_activate,
links=links,
)
def end_span(self, skip_frames: int = 0, duration: Optional[float] = None, outcome: str = "unknown"):
"""
End the currently active span
:param skip_frames: numbers of frames to skip in the stack trace
:param duration: override duration, mostly useful for testing
:param outcome: outcome of the span, either success, failure or unknown
:return: the ended span
"""
span = execution_context.get_span()
if span is None:
raise LookupError()
# only overwrite span outcome if it is still unknown
if not span.outcome or span.outcome == "unknown":
span.outcome = outcome
span.end(skip_frames=skip_frames, duration=duration)
return span
def ensure_parent_id(self) -> str:
"""If current trace_parent has no span_id, generate one, then return it
This is used to generate a span ID which the RUM agent will use to correlate
the RUM transaction with the backend transaction.
"""
if self.trace_parent.span_id == self.id:
self.trace_parent.span_id = "%016x" % random.getrandbits(64)
logger.debug("Set parent id to generated %s", self.trace_parent.span_id)
return self.trace_parent.span_id
def to_dict(self) -> dict:
# This copy() only covers top level `.pop()` calls, so if we ever start
# modifying nested data, we'll need to do a deep copy.
context = self.context.copy()
context["tags"] = self.labels
result = {
"id": self.id,
"trace_id": self.trace_parent.trace_id,
"name": encoding.keyword_field(self.name or ""),
"type": encoding.keyword_field(self.transaction_type),
"duration": self.duration.total_seconds() * 1000 if self.duration else None,
"result": encoding.keyword_field(str(self.result)),
"timestamp": int(self.timestamp * 1_000_000), # microseconds
"outcome": self.outcome,
"sampled": self.is_sampled,
"span_count": {"started": self._span_counter, "dropped": self.dropped_spans},
}
if self._dropped_span_statistics:
result["dropped_spans_stats"] = [
{
"destination_service_resource": resource,
"service_target_type": target_type,
"service_target_name": target_name,
"outcome": outcome,
"duration": {"count": v["count"], "sum": {"us": int(v["duration.sum.us"])}},
}
for (resource, outcome, target_type, target_name), v in self._dropped_span_statistics.items()
]
if self.sample_rate is not None:
result["sample_rate"] = float(self.sample_rate)
if self.trace_parent:
result["trace_id"] = self.trace_parent.trace_id
# only set parent_id if this transaction isn't the root
if self.trace_parent.span_id and self.trace_parent.span_id != self.id:
result["parent_id"] = self.trace_parent.span_id
if self.links:
result["links"] = self.links
# faas context belongs top-level on the transaction
if "faas" in context:
result["faas"] = context.pop("faas")
# otel attributes and spankind need to be top-level
if "otel_spankind" in context:
result["otel"] = {"span_kind": context.pop("otel_spankind")}
# Some transaction_store_tests use the Tracer without a Client -- the
# extra check against `get_client()` is here to make those tests pass
if elasticapm.get_client() and elasticapm.get_client().check_server_version(gte=(7, 16)):
if "otel_attributes" in context:
if "otel" not in result:
result["otel"] = {"attributes": context.pop("otel_attributes")}
else:
result["otel"]["attributes"] = context.pop("otel_attributes")
else:
# Attributes map to labels for older versions
attributes = context.pop("otel_attributes", {})
for key, value in attributes.items():
result["context"]["tags"][key] = value
if self.is_sampled:
result["context"] = context
return result
def track_span_duration(self, span_type, span_subtype, self_duration) -> None:
# TODO: once asynchronous spans are supported, we should check if the transaction is already finished
# TODO: and, if it has, exit without tracking.
with self._span_timers_lock:
self._span_timers[(span_type, span_subtype)].update(self_duration.total_seconds() * 1_000_000)
@property
def is_sampled(self) -> bool:
return self._is_sampled
@is_sampled.setter
def is_sampled(self, is_sampled: bool) -> None:
"""
This should never be called in normal operation, but often is used
for testing. We just want to make sure our sample_rate comes out correctly
in tracestate if we set is_sampled to False.
"""
self._is_sampled = is_sampled
if not is_sampled:
if self.sample_rate:
self.sample_rate = "0"
self.trace_parent.add_tracestate(constants.TRACESTATE.SAMPLE_RATE, self.sample_rate)
@property
def tracer(self) -> "Tracer":
return self._tracer
def track_dropped_span(self, span: SpanType) -> None:
with self._span_timers_lock:
try:
resource = span.context["destination"]["service"]["resource"]
target_type = nested_key(span.context, "service", "target", "type")
target_name = nested_key(span.context, "service", "target", "name")
stats = self._dropped_span_statistics[(resource, span.outcome, target_type, target_name)]
stats["count"] += 1
stats["duration.sum.us"] += int(span.duration.total_seconds() * 1_000_000)
except KeyError:
pass
class Span(BaseSpan):
__slots__ = (
"id",
"transaction",
"name",
"type",
"subtype",
"action",
"context",
"leaf",
"dist_tracing_propagated",
"timestamp",
"start_time",
"ended_time",
"duration",
"parent",
"parent_span_id",
"frames",
"labels",
"sync",
"outcome",
"_child_durations",
"_cancelled",
)
def __init__(
self,
transaction: Transaction,
name: str,
span_type: str,
context: Optional[dict] = None,
leaf: bool = False,
labels: Optional[dict] = None,
parent: Optional["Span"] = None,
parent_span_id: Optional[str] = None,
span_subtype: Optional[str] = None,
span_action: Optional[str] = None,
sync: Optional[bool] = None,
start: Optional[int] = None,
links: Optional[Sequence[TraceParent]] = None,
) -> None:
"""
Create a new Span
:param transaction: transaction object that this span relates to
:param name: Generic name of the span
:param span_type: type of the span, e.g. db
:param context: context dictionary
:param leaf: is this span a leaf span?
:param labels: a dict of labels
:param parent_span_id: override of the span ID
:param span_subtype: sub type of the span, e.g. mysql
:param span_action: sub type of the span, e.g. query
:param sync: indicate if the span was executed synchronously or asynchronously
:param start: timestamp, mostly useful for testing
"""
self.id = self.get_dist_tracing_id()
self.transaction = transaction
self.name = name or "unnamed"
self.context = context if context is not None else {}
self.leaf = leaf
# timestamp is bit of a mix of monotonic and non-monotonic time sources.
# we take the (non-monotonic) transaction timestamp, and add the (monotonic) difference of span
# start time and transaction start time. In this respect, the span timestamp is guaranteed to grow
# monotonically with respect to the transaction timestamp
self.parent = parent
self.parent_span_id = parent_span_id
self.frames = None
self.sync = sync
self.type = span_type
self.subtype = span_subtype
self.action = span_action
self.dist_tracing_propagated = False
self.composite: Dict[str, Any] = {}
self._cancelled: bool = False
super().__init__(labels=labels, start=start, links=links)
self.timestamp = transaction.timestamp + (self.start_time - transaction.start_time)
if self.transaction._breakdown:
p = self.parent if self.parent else self.transaction
p.child_started(self.start_time)
def to_dict(self) -> dict:
if (
self.composite
and self.composite["compression_strategy"] == "same_kind"
and nested_key(self.context, "destination", "service", "resource")
):
name = "Calls to " + self.context["destination"]["service"]["resource"]
else:
name = self.name
result = {
"id": self.id,
"transaction_id": self.transaction.id,
"trace_id": self.transaction.trace_parent.trace_id,
# use either the explicitly set parent_span_id, or the id of the parent, or finally the transaction id
"parent_id": self.parent_span_id or (self.parent.id if self.parent else self.transaction.id),
"name": encoding.keyword_field(name),
"type": encoding.keyword_field(self.type),
"subtype": encoding.keyword_field(self.subtype),
"action": encoding.keyword_field(self.action),
"timestamp": int(self.timestamp * 1000000), # microseconds
"duration": self.duration.total_seconds() * 1000,
"outcome": self.outcome,
}
if self.transaction.sample_rate is not None:
result["sample_rate"] = float(self.transaction.sample_rate)
if self.sync is not None:
result["sync"] = self.sync
if self.labels:
if self.context is None:
self.context = {}
self.context["tags"] = self.labels
if self.links:
result["links"] = self.links
if self.context:
self.autofill_resource_context()
# otel attributes and spankind need to be top-level
if "otel_spankind" in self.context:
result["otel"] = {"span_kind": self.context.pop("otel_spankind")}
if self.tracer._agent.check_server_version(gte=(7, 16)):
if "otel_attributes" in self.context:
if "otel" not in result:
result["otel"] = {"attributes": self.context.pop("otel_attributes")}
else:
result["otel"]["attributes"] = self.context.pop("otel_attributes")
else:
# Attributes map to labels for older versions
attributes = self.context.pop("otel_attributes", {})
if attributes and ("tags" not in self.context):
self.context["tags"] = {}
for key, value in attributes.items():
self.context["tags"][key] = value
result["context"] = self.context
if self.frames:
result["stacktrace"] = self.frames
if self.composite:
result["composite"] = {
"compression_strategy": self.composite["compression_strategy"],
"sum": self.composite["sum"].total_seconds() * 1000,
"count": self.composite["count"],
}
return result
def is_same_kind(self, other_span: SpanType) -> bool:
"""
For compression purposes, two spans are considered to be of the same kind if they have the same
values for type, subtype, and destination.service.resource
:param other_span: another span object
:return: bool
"""
target_type = nested_key(self.context, "service", "target", "type")
target_name = nested_key(self.context, "service", "target", "name")
return bool(
self.type == other_span.type
and self.subtype == other_span.subtype
and (target_type or target_name)
and target_type == nested_key(other_span.context, "service", "target", "type")
and target_name == nested_key(other_span.context, "service", "target", "name")
)
def is_exact_match(self, other_span: SpanType) -> bool:
"""
For compression purposes, two spans are considered to be an exact match if the have the same
name and are of the same kind.
:param other_span: another span object
:return: bool
"""
return bool(self.name == other_span.name and self.is_same_kind(other_span))
def is_compression_eligible(self) -> bool:
"""
Determine if this span is eligible for compression.
"""
if self.transaction.config_span_compression_enabled:
return self.leaf and not self.dist_tracing_propagated and self.outcome in (None, constants.OUTCOME.SUCCESS)
return False
@property
def discardable(self) -> bool:
return self.leaf and not self.dist_tracing_propagated and self.outcome == constants.OUTCOME.SUCCESS
def end(self, skip_frames: int = 0, duration: Optional[float] = None) -> None:
"""
End this span and queue it for sending.
:param skip_frames: amount of frames to skip from the beginning of the stack trace
:param duration: override duration, mostly useful for testing
:return: None
"""
self.autofill_resource_context()
self.autofill_service_target()
super().end(skip_frames, duration)
tracer = self.transaction.tracer
if (
tracer.span_stack_trace_min_duration >= timedelta(seconds=0)
and self.duration >= tracer.span_stack_trace_min_duration
and self.frames
):
self.frames = tracer.frames_processing_func(self.frames)[skip_frames:]
else:
self.frames = None
current_span = execution_context.get_span()
# Because otel can detach context without ending the span, we need to
# make sure we only unset the span if it's currently set.
if current_span is self:
execution_context.unset_span()
p = self.parent if self.parent else self.transaction
if self.transaction._breakdown:
p._child_durations.stop(self.start_time + self.duration.total_seconds())
self.transaction.track_span_duration(
self.type, self.subtype, self.duration - self._child_durations.duration
)
p.child_ended(self)
def report(self) -> None:
if self.discardable and self.duration < self.transaction.config_exit_span_min_duration:
self.transaction.track_dropped_span(self)
self.transaction.dropped_spans += 1
elif self._cancelled:
self.transaction._span_counter -= 1
else:
self.tracer.queue_func(SPAN, self.to_dict())
def try_to_compress(self, sibling: SpanType) -> bool:
compression_strategy = (
self._try_to_compress_composite(sibling) if self.composite else self._try_to_compress_regular(sibling)
)
if not compression_strategy:
return False
if not self.composite:
self.composite = {"compression_strategy": compression_strategy, "count": 1, "sum": self.duration}
self.composite["count"] += 1
self.composite["sum"] += sibling.duration
self.duration = timedelta(seconds=sibling.ended_time - self.start_time)
self.transaction._span_counter -= 1
return True
def _try_to_compress_composite(self, sibling: SpanType) -> Optional[str]:
if self.composite["compression_strategy"] == "exact_match":
return (
"exact_match"
if (
self.is_exact_match(sibling)
and sibling.duration <= self.transaction.config_span_compression_exact_match_max_duration
)
else None
)
elif self.composite["compression_strategy"] == "same_kind":
return (
"same_kind"
if (
self.is_same_kind(sibling)
and sibling.duration <= self.transaction.config_span_compression_same_kind_max_duration
)
else None
)
return None
def _try_to_compress_regular(self, sibling: SpanType) -> Optional[str]:
if not self.is_same_kind(sibling):
return None
if self.name == sibling.name:
max_duration = self.transaction.config_span_compression_exact_match_max_duration
if self.duration <= max_duration and sibling.duration <= max_duration:
return "exact_match"
return None
max_duration = self.transaction.config_span_compression_same_kind_max_duration
if self.duration <= max_duration and sibling.duration <= max_duration:
return "same_kind"
return None
def update_context(self, key, data) -> None:
"""
Update the context data for given key
:param key: the key, e.g. "db"
:param data: a dictionary
:return: None
"""
current = self.context.get(key, {})
current.update(data)
self.context[key] = current
def autofill_resource_context(self) -> None:
"""Automatically fills "resource" fields based on other fields"""
if self.context:
resource = nested_key(self.context, "destination", "service", "resource")
if not resource and (self.leaf or any(k in self.context for k in ("destination", "db", "message", "http"))):
type_info = self.subtype or self.type
instance = nested_key(self.context, "db", "instance")
queue_name = nested_key(self.context, "message", "queue", "name")
http_url = nested_key(self.context, "http", "url")
if instance:
resource = f"{type_info}/{instance}"
elif queue_name:
resource = f"{type_info}/{queue_name}"
elif http_url:
resource = url_to_destination_resource(http_url)
else:
resource = type_info
if "destination" not in self.context:
self.context["destination"] = {}
if "service" not in self.context["destination"]:
self.context["destination"]["service"] = {}
self.context["destination"]["service"]["resource"] = resource
# set fields that are deprecated, but still required by APM Server API
if "name" not in self.context["destination"]["service"]:
self.context["destination"]["service"]["name"] = ""
if "type" not in self.context["destination"]["service"]:
self.context["destination"]["service"]["type"] = ""
def autofill_service_target(self) -> None:
if self.leaf:
service_target = nested_key(self.context, "service", "target") or {}
if "type" not in service_target: # infer type from span type & subtype
# use sub-type if provided, fallback on type othewise
service_target["type"] = self.subtype or self.type
if "name" not in service_target: # infer name from span attributes
if nested_key(self.context, "db", "instance"): # database spans
service_target["name"] = self.context["db"]["instance"]
elif "message" in self.context: # messaging spans
service_target["name"] = self.context["message"]["queue"]["name"]
elif nested_key(self.context, "http", "url"): # http spans
url = self.context["http"]["url"]
parsed_url = urllib.parse.urlparse(url)
service_target["name"] = parsed_url.hostname
if parsed_url.port:
service_target["name"] += f":{parsed_url.port}"
if "service" not in self.context:
self.context["service"] = {}
self.context["service"]["target"] = service_target
elif nested_key(self.context, "service", "target"):
# non-exit spans should not have service.target.* fields
del self.context["service"]["target"]
def cancel(self) -> None:
"""
Mark span as cancelled. Cancelled spans don't count towards started spans nor dropped spans.
No checks are made to ensure that spans which already propagated distributed context are not
cancelled.
"""
self._cancelled = True
def __str__(self):
return "{}/{}/{}".format(self.name, self.type, self.subtype)
@property
def tracer(self) -> "Tracer":
return self.transaction.tracer
class DroppedSpan(BaseSpan):
__slots__ = ("leaf", "parent", "id", "context", "outcome", "dist_tracing_propagated")
def __init__(self, parent, leaf=False, start=None, context=None) -> None:
self.parent = parent
self.leaf = leaf
self.id = None
self.dist_tracing_propagated = False
self.context = context
self.outcome = constants.OUTCOME.UNKNOWN
super(DroppedSpan, self).__init__(start=start)
def end(self, skip_frames: int = 0, duration: Optional[float] = None) -> None:
super().end(skip_frames, duration)
execution_context.unset_span()
def child_started(self, timestamp) -> None:
pass
def child_ended(self, child: SpanType) -> None:
pass
def update_context(self, key, data) -> None:
pass
def report(self) -> None:
pass
def try_to_compress(self, sibling: SpanType) -> bool:
return False
def is_compression_eligible(self) -> bool:
return False
@property
def name(self):
return "DroppedSpan"
@property
def type(self):
return None
@property
def subtype(self):
return None
@property
def action(self):
return None
class Tracer(object):
def __init__(
self, frames_collector_func, frames_processing_func, queue_func, config, agent: "elasticapm.Client"
) -> None:
self.config = config
self.queue_func = queue_func
self.frames_processing_func = frames_processing_func
self.frames_collector_func = frames_collector_func
self._agent = agent
self._ignore_patterns = [re.compile(p) for p in config.transactions_ignore_patterns or []]
@property
def span_stack_trace_min_duration(self) -> timedelta:
if self.config.span_stack_trace_min_duration != timedelta(
seconds=0.005
) or self.config.span_frames_min_duration == timedelta(seconds=0.005):
# No need to check span_frames_min_duration
return self.config.span_stack_trace_min_duration
else:
# span_stack_trace_min_duration is default value and span_frames_min_duration is non-default.
# warn and use span_frames_min_duration
warnings.warn(
"`span_frames_min_duration` is deprecated. Please use `span_stack_trace_min_duration`.",
DeprecationWarning,
)
if self.config.span_frames_min_duration < timedelta(seconds=0):
return timedelta(seconds=0)
elif self.config.span_frames_min_duration == timedelta(seconds=0):
return timedelta(seconds=-1)
else:
return self.config.span_frames_min_duration
def begin_transaction(
self,
transaction_type: str,
trace_parent: Optional[TraceParent] = None,
start: Optional[float] = None,
auto_activate: bool = True,
links: Optional[Sequence[TraceParent]] = None,
) -> Transaction:
"""
Start a new transactions and bind it in a thread-local variable
:param transaction_type: type of the transaction, e.g. "request"
:param trace_parent: an optional TraceParent object
:param start: override the start timestamp, mostly useful for testing
:param auto_activate: whether to set this transaction in execution_context
:param links: list of traceparents to causally link this transaction to
:returns the Transaction object
"""
links = links if links else []
continuation_strategy = self.config.trace_continuation_strategy
# we restart the trace if continuation strategy is "restart", or if it is "restart_external" and our
# "es" key is not in the tracestate header. In both cases, the original TraceParent is added to trace links.
if trace_parent and continuation_strategy != constants.TRACE_CONTINUATION_STRATEGY.CONTINUE:
if continuation_strategy == constants.TRACE_CONTINUATION_STRATEGY.RESTART or (
continuation_strategy == constants.TRACE_CONTINUATION_STRATEGY.RESTART_EXTERNAL
and not trace_parent.tracestate_dict
):
links.append(trace_parent)
trace_parent = None
if trace_parent:
is_sampled = bool(trace_parent.trace_options.recorded)
sample_rate = trace_parent.tracestate_dict.get(constants.TRACESTATE.SAMPLE_RATE)
else:
is_sampled = (
self.config.transaction_sample_rate == 1.0 or self.config.transaction_sample_rate > random.random()
)
if not is_sampled:
sample_rate = "0"
else:
sample_rate = str(self.config.transaction_sample_rate)
transaction = Transaction(
self,
transaction_type,
trace_parent=trace_parent,
is_sampled=is_sampled,
start=start,
sample_rate=sample_rate,
links=links,
)
if trace_parent is None:
transaction.trace_parent.add_tracestate(constants.TRACESTATE.SAMPLE_RATE, sample_rate)
if auto_activate:
execution_context.set_transaction(transaction)
return transaction
def end_transaction(self, result=None, transaction_name=None, duration=None):
"""
End the current transaction and queue it for sending
:param result: result of the transaction, e.g. "OK" or 200
:param transaction_name: name of the transaction
:param duration: override duration, mostly useful for testing
:return:
"""
transaction = execution_context.get_transaction(clear=True)
if transaction:
if transaction.name is None:
transaction.name = str(transaction_name) if transaction_name is not None else ""
transaction.end(duration=duration)
if self._should_ignore(transaction.name):
return
if not transaction.is_sampled and self._agent.check_server_version(gte=(8, 0)):
return
if transaction.result is None:
transaction.result = result
self.queue_func(TRANSACTION, transaction.to_dict())
return transaction
def _should_ignore(self, transaction_name):
for pattern in self._ignore_patterns:
if pattern.search(transaction_name):
return True
return False
class capture_span(object):
__slots__ = (
"name",
"type",
"subtype",
"action",
"extra",
"skip_frames",
"leaf",
"labels",
"duration",
"start",
"sync",
"links",
)
def __init__(
self,
name: Optional[str] = None,
span_type: str = "code.custom",
extra: Optional[dict] = None,
skip_frames: int = 0,
leaf: bool = False,
labels: Optional[dict] = None,
span_subtype: Optional[str] = None,
span_action: Optional[str] = None,
start: Optional[int] = None,
duration: Optional[Union[float, timedelta]] = None,
sync: Optional[bool] = None,
links: Optional[Sequence[TraceParent]] = None,
) -> None:
self.name = name
if span_subtype is None and "." in span_type:
# old style dotted type, let's split it up
type_bits = span_type.split(".")
if len(type_bits) == 2:
span_type, span_subtype = type_bits[:2]
else:
span_type, span_subtype, span_action = type_bits[:3]
self.type = span_type
self.subtype = span_subtype
self.action = span_action
self.extra = extra
self.skip_frames = skip_frames
self.leaf = leaf
self.labels = labels
self.start = start
if duration and not isinstance(duration, timedelta):
duration = timedelta(seconds=duration)
self.duration = duration
self.sync = sync
self.links = links
def __call__(self, func: _AnnotatedFunctionT) -> _AnnotatedFunctionT:
self.name = self.name or get_name_from_func(func)
@functools.wraps(func)
def decorated(*args, **kwds):
with self:
return func(*args, **kwds)
return decorated
def __enter__(self) -> Optional[SpanType]:
return self.handle_enter(self.sync)
def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
self.handle_exit(exc_type, exc_val, exc_tb)
def handle_enter(self, sync: bool) -> Optional[SpanType]:
transaction = execution_context.get_transaction()
if transaction and transaction.is_sampled:
return transaction.begin_span(
self.name,
self.type,
context=self.extra,
leaf=self.leaf,
labels=self.labels,
span_subtype=self.subtype,
span_action=self.action,
start=self.start,
sync=sync,
links=self.links,
)
return None
def handle_exit(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
transaction = execution_context.get_transaction()
if transaction and transaction.is_sampled:
try:
outcome = "failure" if exc_val else "success"
span = transaction.end_span(self.skip_frames, duration=self.duration, outcome=outcome)
should_track_dropped = (
transaction.tracer._agent.check_server_version(gte=(7, 16)) if transaction.tracer._agent else True
)
if should_track_dropped and isinstance(span, DroppedSpan) and span.context:
transaction.track_dropped_span(span)
if exc_val and not isinstance(span, DroppedSpan):
try:
exc_val._elastic_apm_span_id = span.id
except AttributeError:
# could happen if the exception has __slots__
pass
except LookupError:
logger.debug("ended non-existing span %s of type %s", self.name, self.type)
def label(**labels) -> None:
"""
Labels current transaction. Keys should be strings, values can be strings, booleans,
or numerical values (int, float, Decimal)
:param labels: key/value map of labels
"""
transaction = execution_context.get_transaction()
if not transaction:
client = elasticapm.get_client()
if not client or client.config.enabled:
error_logger.warning("Ignored labels %s. No transaction currently active.", ", ".join(labels.keys()))
else:
transaction.label(**labels)
def set_transaction_name(name: str, override: bool = True) -> None:
"""
Sets the name of the transaction
:param name: the name of the transaction
:param override: if set to False, the name is only set if no name has been set before
:return: None
"""
transaction = execution_context.get_transaction()
if not transaction:
return
if transaction.name is None or override:
transaction.name = str(name)
def set_transaction_result(result, override=True) -> None:
"""
Sets the result of the transaction. The result could be e.g. the HTTP status class (e.g "HTTP 5xx") for
HTTP requests, or "success"/"failure" for background tasks.
:param result: Details of the transaction result that should be set
:param override: if set to False, the name is only set if no name has been set before
:return: None
"""
transaction = execution_context.get_transaction()
if not transaction:
return
if transaction.result is None or override:
transaction.result = result
def set_transaction_outcome(outcome=None, http_status_code=None, override=True) -> None:
"""
Set the outcome of the transaction. This should only be done at the end of a transaction
after the outcome is determined.
If an invalid outcome is provided, an INFO level log message will be issued.
:param outcome: the outcome of the transaction. Allowed values are "success", "failure", "unknown". None is
allowed if a http_status_code is provided.
:param http_status_code: An integer value of the HTTP status code. If provided, the outcome will be determined
based on the status code: Success if the status is lower than 500, failure otherwise.
If both a valid outcome and an http_status_code is provided, the former is used
:param override: If set to False, the outcome will only be updated if its current value is None
:return: None
"""
transaction = execution_context.get_transaction()
if not transaction:
return
if http_status_code and outcome not in constants.OUTCOME:
try:
http_status_code = int(http_status_code)
outcome = constants.OUTCOME.SUCCESS if http_status_code < 500 else constants.OUTCOME.FAILURE
except ValueError:
logger.info('Invalid HTTP status %r provided, outcome set to "unknown"', http_status_code)
outcome = constants.OUTCOME.UNKNOWN
elif outcome not in constants.OUTCOME:
logger.info('Invalid outcome %r provided, outcome set to "unknown"', outcome)
outcome = constants.OUTCOME.UNKNOWN
if outcome and (transaction.outcome is None or override):
transaction.outcome = outcome
def get_transaction_id():
"""
Returns the current transaction ID
"""
transaction = execution_context.get_transaction()
if not transaction:
return
return transaction.id
def get_trace_parent_header():
"""
Return the trace parent header for the current transaction.
"""
transaction = execution_context.get_transaction()
if not transaction or not transaction.trace_parent:
return
return transaction.trace_parent.to_string()
def get_trace_id():
"""
Returns the current trace ID
"""
transaction = execution_context.get_transaction()
if not transaction:
return
return transaction.trace_parent.trace_id if transaction.trace_parent else None
def get_span_id():
"""
Returns the current span ID
"""
span = execution_context.get_span()
if not span:
return
return span.id
def set_context(data, key="custom") -> None:
"""
Attach contextual data to the current transaction and errors that happen during the current transaction.
If the transaction is not sampled, this function becomes a no-op.
:param data: a dictionary, or a callable that returns a dictionary
:param key: the namespace for this data
"""
transaction = execution_context.get_transaction()
if not (transaction and transaction.is_sampled):
return
if callable(data):
data = data()
# remove invalid characters from key names
for k in list(data.keys()):
if LABEL_RE.search(k):
data[LABEL_RE.sub("_", k)] = data.pop(k)
if key in transaction.context:
transaction.context[key].update(data)
else:
transaction.context[key] = data
set_custom_context = functools.partial(set_context, key="custom")
def set_user_context(username=None, email=None, user_id=None) -> None:
data = {}
if username is not None:
data["username"] = encoding.keyword_field(username)
if email is not None:
data["email"] = encoding.keyword_field(email)
if user_id is not None:
data["id"] = encoding.keyword_field(user_id)
set_context(data, "user")