elasticapm/instrumentation/packages/azure.py (351 lines of code) (raw):
# BSD 3-Clause License
#
# Copyright (c) 2019, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import json
import urllib.parse
from collections import namedtuple
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
from elasticapm.traces import capture_span
from elasticapm.utils.logging import get_logger
logger = get_logger("elasticapm.instrument")
HandlerInfo = namedtuple("HandlerInfo", ("signature", "span_type", "span_subtype", "span_action", "context"))
class AzureInstrumentation(AbstractInstrumentedModule):
name = "azure"
instrument_list = [
("azure.core.pipeline._base", "Pipeline.run"),
("azure.cosmosdb.table.common._http.httpclient", "_HTTPClient.perform_request"),
]
def call(self, module, method, wrapped, instance, args, kwargs):
if len(args) == 1:
request = args[0]
else:
request = kwargs["request"]
if hasattr(request, "url"): # Azure Storage HttpRequest
parsed_url = urllib.parse.urlparse(request.url)
hostname = parsed_url.hostname
port = parsed_url.port
path = parsed_url.path
query_params = urllib.parse.parse_qs(parsed_url.query)
else: # CosmosDB HTTPRequest
hostname = request.host
port = hostname.split(":")[1] if ":" in hostname else 80
path = request.path
query_params = request.query
# Detect the service
service = None
if ".blob.core." in hostname:
service = "azureblob"
service_type = "storage"
elif ".queue.core." in hostname:
service = "azurequeue"
service_type = "messaging"
elif ".table.core." in hostname:
service = "azuretable"
service_type = "storage"
elif ".file.core." in hostname:
service = "azurefile"
service_type = "storage"
# Do not create a span if we don't recognize the service
if not service:
return wrapped(*args, **kwargs)
context = {
"destination": {
"address": hostname,
"port": port,
}
}
handler_info = handlers[service](request, hostname, path, query_params, service, service_type, context)
with capture_span(
handler_info.signature,
span_type=handler_info.span_type,
leaf=True,
span_subtype=handler_info.span_subtype,
span_action=handler_info.span_action,
extra=handler_info.context,
):
return wrapped(*args, **kwargs)
def handle_azureblob(request, hostname, path, query_params, service, service_type, context):
"""
Returns the HandlerInfo for Azure Blob Storage operations
"""
account_name = hostname.split(".")[0]
context["destination"]["service"] = {
"name": service,
"resource": "{}/{}".format(service, account_name),
"type": service_type,
}
method = request.method
headers = request.headers
blob = path[1:]
operation_name = "Unknown"
if method.lower() == "delete":
operation_name = "Delete"
elif method.lower() == "get":
operation_name = "Download"
if "container" in query_params.get("restype", []):
operation_name = "GetProperties"
if "acl" in query_params.get("comp", []):
operation_name = "GetAcl"
elif "list" in query_params.get("comp", []):
operation_name = "ListBlobs"
elif "metadata" in query_params.get("comp", []):
operation_name = "GetMetadata"
elif "list" in query_params.get("comp", []):
operation_name = "ListContainers"
elif "tags" in query_params.get("comp", []):
operation_name = "GetTags"
if query_params.get("where"):
operation_name = "FindTags"
elif "blocklist" in query_params.get("comp", []):
operation_name = "GetBlockList"
elif "pagelist" in query_params.get("comp", []):
operation_name = "GetPageRanges"
elif "stats" in query_params.get("comp", []):
operation_name = "Stats"
elif "blobs" in query_params.get("comp", []):
operation_name = "FilterBlobs"
elif method.lower() == "head":
operation_name = "GetProperties"
if "container" in query_params.get("restype", []) and query_params.get("comp") == "metadata":
operation_name = "GetMetadata"
elif "container" in query_params.get("restype", []) and query_params.get("comp") == "acl":
operation_name = "GetAcl"
elif method.lower() == "post":
if "batch" in query_params.get("comp", []):
operation_name = "Batch"
elif "query" in query_params.get("comp", []):
operation_name = "Query"
elif "userdelegationkey" in query_params.get("comp", []):
operation_name = "GetUserDelegationKey"
elif method.lower() == "put":
operation_name = "Create"
if "x-ms-copy-source" in headers:
operation_name = "Copy"
# These are repetitive and unnecessary, but included in case the table at
# https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-azure.md
# changes in the future
if "block" in query_params.get("comp", []):
operation_name = "Copy"
elif "page" in query_params.get("comp", []):
operation_name = "Copy"
elif "incrementalcopy" in query_params.get("comp", []):
operation_name = "Copy"
elif "appendblock" in query_params.get("comp", []):
operation_name = "Copy"
elif "x-ms-blob-type" in headers:
operation_name = "Upload"
elif "x-ms-page-write" in headers and query_params.get("comp") == "page":
operation_name = "Clear"
elif "copy" in query_params.get("comp", []):
operation_name = "Abort"
elif "block" in query_params.get("comp", []):
operation_name = "Upload"
elif "blocklist" in query_params.get("comp", []):
operation_name = "Upload"
elif "page" in query_params.get("comp", []):
operation_name = "Upload"
elif "appendblock" in query_params.get("comp", []):
operation_name = "Upload"
elif "metadata" in query_params.get("comp", []):
operation_name = "SetMetadata"
elif "container" in query_params.get("restype", []) and query_params.get("comp") == "acl":
operation_name = "SetAcl"
elif "properties" in query_params.get("comp", []):
operation_name = "SetProperties"
elif "lease" in query_params.get("comp", []):
operation_name = "Lease"
elif "snapshot" in query_params.get("comp", []):
operation_name = "Snapshot"
elif "undelete" in query_params.get("comp", []):
operation_name = "Undelete"
elif "tags" in query_params.get("comp", []):
operation_name = "SetTags"
elif "tier" in query_params.get("comp", []):
operation_name = "SetTier"
elif "expiry" in query_params.get("comp", []):
operation_name = "SetExpiry"
elif "seal" in query_params.get("comp", []):
operation_name = "Seal"
elif "rename" in query_params.get("comp", []):
operation_name = "Rename"
signature = "AzureBlob {} {}".format(operation_name, blob)
return HandlerInfo(signature, service_type, service, operation_name, context)
def handle_azurequeue(request, hostname, path, query_params, service, service_type, context):
"""
Returns the HandlerInfo for Azure Queue operations
"""
account_name = hostname.split(".")[0]
method = request.method
resource_name = path.split("/")[1] if "/" in path else account_name # /queuename/messages
context["destination"]["service"] = {
"name": service,
"resource": "{}/{}".format(service, resource_name),
"type": service_type,
}
operation_name = "UNKNOWN"
preposition = "to "
if method.lower() == "delete":
operation_name = "DELETE"
preposition = ""
if path.endswith("/messages") and "popreceipt" not in query_params:
operation_name = "CLEAR"
elif query_params.get("popreceipt", []):
# Redundant, but included in case the table at
# https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-azure.md
# changes in the future
operation_name = "DELETE"
preposition = "from "
elif method.lower() == "get":
operation_name = "RECEIVE"
preposition = "from "
if "list" in query_params.get("comp", []):
operation_name = "LISTQUEUES"
elif "properties" in query_params.get("comp", []):
operation_name = "GETPROPERTIES"
elif "stats" in query_params.get("comp", []):
operation_name = "STATS"
elif "metadata" in query_params.get("comp", []):
operation_name = "GETMETADATA"
elif "acl" in query_params.get("comp", []):
operation_name = "GETACL"
elif "true" in query_params.get("peekonly", []):
operation_name = "PEEK"
elif method.lower() == "head":
operation_name = "RECEIVE"
preposition = "from "
if "metadata" in query_params.get("comp", []):
operation_name = "GETMETADATA"
elif "acl" in query_params.get("comp", []):
operation_name = "GETACL"
elif method.lower() == "options":
operation_name = "PREFLIGHT"
preposition = "from "
elif method.lower() == "post":
operation_name = "SEND"
preposition = "to "
elif method.lower() == "put":
operation_name = "CREATE"
preposition = ""
if "metadata" in query_params.get("comp", []):
operation_name = "SETMETADATA"
preposition = "for "
elif "acl" in query_params.get("comp", []):
operation_name = "SETACL"
preposition = "for "
elif "properties" in query_params.get("comp", []):
operation_name = "SETPROPERTIES"
preposition = "for "
elif query_params.get("popreceipt", []):
operation_name = "UPDATE"
preposition = ""
# If `preposition` is included, it should have a trailing space
signature = "AzureQueue {} {}{}".format(operation_name, preposition, resource_name)
return HandlerInfo(signature, service_type, service, operation_name.lower(), context)
def handle_azuretable(request, hostname, path, query_params, service, service_type, context):
"""
Returns the HandlerInfo for Azure Table Storage operations
"""
account_name = hostname.split(".")[0]
method = request.method
body = request.body
try:
body = json.loads(body)
except json.decoder.JSONDecodeError: # str not bytes
body = {}
# /tablename(PartitionKey='<partition-key>',RowKey='<row-key>')
resource_name = path.split("/", 1)[1] if "/" in path else path
context["destination"]["service"] = {
"name": service,
"resource": "{}/{}".format(service, account_name),
"type": service_type,
}
operation_name = "Unknown"
if method.lower() == "put":
operation_name = "Update"
if "properties" in query_params.get("comp", []):
operation_name = "SetProperties"
elif "acl" in query_params.get("comp", []):
operation_name = "SetAcl"
elif method.lower() == "post":
if resource_name == "Tables":
resource_name = body.get("TableName", resource_name)
operation_name = "Create"
else: # /<tablename>
operation_name = "Insert"
elif method.lower() == "get":
operation_name = "Query" # for both /Tables and /table()
if "properties" in query_params.get("comp", []):
operation_name = "GetProperties"
elif "stats" in query_params.get("comp", []):
operation_name = "Stats"
elif "acl" in query_params.get("comp", []):
operation_name = "GetAcl"
elif method.lower() == "delete":
operation_name = "Delete"
if "Tables" in resource_name and "'" in resource_name:
resource_name = resource_name.split("'")[1] # /Tables('<table_name>')
elif method.lower() == "options":
operation_name = "Preflight"
elif method.lower() == "head" and "acl" in query_params.get("comp", []):
operation_name = "GetAcl"
elif method.lower() == "merge":
operation_name = "Merge"
signature = "AzureTable {} {}".format(operation_name, resource_name)
return HandlerInfo(signature, service_type, service, operation_name, context)
def handle_azurefile(request, hostname, path, query_params, service, service_type, context):
"""
Returns the HandlerInfo for Azure File Share Storage operations
"""
account_name = hostname.split(".")[0]
method = request.method
resource_name = path.split("/", 1)[1] if "/" in path else account_name
headers = request.headers
context["destination"]["service"] = {
"name": service,
"resource": "{}/{}".format(service, account_name),
"type": service_type,
}
operation_name = "Unknown"
if method.lower() == "get":
operation_name = "Download"
if "list" in query_params.get("comp", []):
operation_name = "List"
elif "properties" in query_params.get("comp", []):
operation_name = "GetProperties"
elif "share" in query_params.get("restype", []):
operation_name = "GetProperties"
elif "metadata" in query_params.get("comp", []):
operation_name = "GetMetadata"
elif "acl" in query_params.get("comp", []):
operation_name = "GetAcl"
elif "stats" in query_params.get("comp", []):
operation_name = "Stats"
elif "filepermission" in query_params.get("comp", []):
operation_name = "GetPermission"
elif "listhandles" in query_params.get("comp", []):
operation_name = "ListHandles"
elif "rangelist" in query_params.get("comp", []):
operation_name = "ListRanges"
elif method.lower() == "put":
operation_name = "Create"
if "properties" in query_params.get("comp", []):
operation_name = "SetProperties"
if "share" in query_params.get("restype", []):
operation_name = "SetProperties"
elif "snapshot" in query_params.get("comp", []):
operation_name = "Snapshot"
elif "metadata" in query_params.get("comp", []):
operation_name = "SetMetadata"
elif "undelete" in query_params.get("comp", []):
operation_name = "Undelete"
elif "acl" in query_params.get("comp", []):
operation_name = "SetAcl"
elif "filepermission" in query_params.get("comp", []):
operation_name = "SetPermission"
elif "directory" in query_params.get("restype", []):
operation_name = "Create"
elif "forceclosehandles" in query_params.get("comp", []):
operation_name = "CloseHandles"
elif "range" in query_params.get("comp", []):
operation_name = "Upload"
elif "x-ms-copy-source" in headers:
operation_name = "Copy"
elif "x-ms-copy-action" in headers and headers["x-ms-copy-action"] == "abort":
operation_name = "Abort"
elif "lease" in query_params.get("comp", []):
operation_name = "Lease"
elif method.lower() == "options":
operation_name = "Preflight"
elif method.lower() == "head":
operation_name = "GetProperties"
if "share" in query_params.get("restype", []):
operation_name = "GetProperties"
elif "metadata" in query_params.get("comp", []):
operation_name = "GetMetadata"
elif "acl" in query_params.get("comp", []):
operation_name = "GetAcl"
elif method.lower() == "delete":
operation_name = "Delete"
signature = "AzureFile {} {}".format(operation_name, resource_name)
return HandlerInfo(signature, service_type, service, operation_name, context)
handlers = {
"azureblob": handle_azureblob,
"azurequeue": handle_azurequeue,
"azuretable": handle_azuretable,
"azurefile": handle_azurefile,
}