in elasticapm/contrib/opentelemetry/span.py [0:0]
def _set_types(self) -> None:
"""
Set the types and subtypes for the underlying Elastic transaction/span
"""
if isinstance(self.elastic_span, elasticapm.traces.Transaction):
attributes = self.elastic_span.context.get("otel_attributes", {})
span_kind = self.elastic_span.context["otel_spankind"]
is_rpc = bool(attributes.get("rpc.system"))
is_http = bool(attributes.get("http.url")) or bool(attributes.get("http.scheme"))
is_messaging = bool(attributes.get("messaging.system"))
if span_kind == SpanKind.SERVER.name and (is_rpc or is_http):
transaction_type = "request"
elif span_kind == SpanKind.CONSUMER.name and is_messaging:
transaction_type = "messaging"
else:
transaction_type = "unknown"
self.elastic_span.transaction_type = transaction_type
else:
attributes = self.elastic_span.context.get("otel_attributes", {})
span_type = None
span_subtype = None
resource = None
def http_port_from_scheme(scheme: str):
if scheme == "http":
return 80
elif scheme == "https":
return 443
return None
def parse_net_name(url: str):
u = urllib.parse.urlparse(url)
if u.port:
return u.netloc
else:
port = http_port_from_scheme(u.scheme)
return u.netloc if not port else "{}:{}".format(u.netloc, port)
net_port = attributes.get("net.peer.port", -1)
net_name = net_peer = attributes.get("net.peer.name", attributes.get("net.peer.ip", ""))
if net_name and (net_port > 0):
net_name = f"{net_name}:{net_port}"
if attributes.get("db.system"):
span_type = "db"
span_subtype = attributes.get("db.system")
resource = net_name or span_subtype
if attributes.get("db.name"):
resource = "{}/{}".format(resource, attributes.get("db.name"))
elif attributes.get("messaging.system"):
span_type = "messaging"
span_subtype = attributes.get("messaging.system")
if not net_name and attributes.get("messaging.url"):
net_name = parse_net_name(attributes.get("messaging.url"))
resource = net_name or span_subtype
if attributes.get("messaging.destination"):
resource = "{}/{}".format(resource, attributes.get("messaging.destination"))
elif attributes.get("rpc.system"):
span_type = "external"
span_subtype = attributes.get("rpc.system")
resource = net_name or span_subtype
if attributes.get("rpc.service"):
resource = "{}/{}".format(resource, attributes.get("rpc.service"))
elif attributes.get("http.url") or attributes.get("http.scheme"):
span_type = "external"
span_subtype = "http"
http_host = attributes.get("http.host", net_peer)
if http_host:
if net_port < 0:
net_port = http_port_from_scheme(attributes.get("http.scheme"))
resource = http_host if net_port < 0 else f"{http_host}:{net_port}"
elif attributes.get("http.url"):
resource = parse_net_name(attributes["http.url"])
if not span_type:
span_kind = self.elastic_span.context["otel_spankind"]
if span_kind == SpanKind.INTERNAL.name:
span_type = "app"
span_subtype = "internal"
else:
span_type = "unknown"
self.elastic_span.type = span_type
self.elastic_span.subtype = span_subtype
if resource:
if "destination" not in self.elastic_span.context:
self.elastic_span.context["destination"] = {"service": {"resource": resource}}
elif "service" not in self.elastic_span.context["destination"]:
self.elastic_span.context["destination"]["service"] = {"resource": resource}
else:
self.elastic_span.context["destination"]["service"]["resource"] = resource