samcli/lib/telemetry/event.py (221 lines of code) (raw):
"""
Represents Events and their values.
"""
import logging
import threading
from datetime import datetime
from enum import Enum
from typing import List, Optional
from uuid import UUID, uuid4
from samcli.cli.context import Context
from samcli.lib.build.workflows import ALL_CONFIGS
from samcli.lib.config.file_manager import FILE_MANAGER_MAPPER
from samcli.lib.remote_invoke.remote_invoke_executors import RemoteInvokeEventType
from samcli.lib.telemetry.telemetry import Telemetry
from samcli.local.common.runtime_template import INIT_RUNTIMES
LOG = logging.getLogger(__name__)
class EventName(Enum):
"""Enum for the names of available events to track."""
USED_FEATURE = "UsedFeature"
BUILD_FUNCTION_RUNTIME = "BuildFunctionRuntime"
SYNC_USED = "SyncUsed"
SYNC_FLOW_START = "SyncFlowStart"
SYNC_FLOW_END = "SyncFlowEnd"
BUILD_WORKFLOW_USED = "BuildWorkflowUsed"
CONFIG_FILE_EXTENSION = "SamConfigFileExtension"
HOOK_CONFIGURATIONS_USED = "HookConfigurationsUsed"
REMOTE_INVOKE_EVENT_TYPE = "RemoteInvokeEventType"
class UsedFeature(Enum):
"""Enum for the names of event values of UsedFeature"""
ACCELERATE = "Accelerate"
CDK = "CDK"
INIT_WITH_APPLICATION_INSIGHTS = "InitWithApplicationInsights"
CFNLint = "CFNLint"
INVOKED_CUSTOM_LAMBDA_AUTHORIZERS = "InvokedLambdaAuthorizers"
BUILD_IN_SOURCE = "BuildInSource"
class EventType:
"""Class for Events and the types of values they may have."""
_SYNC_FLOWS = [
"AliasVersionSyncFlow",
"AutoDependencyLayerSyncFlow",
"AutoDependencyLayerParentSyncFlow",
"FunctionSyncFlow",
"FunctionLayerReferenceSync",
"GenericApiSyncFlow",
"HttpApiSyncFlow",
"ImageFunctionSyncFlow",
"LayerSyncFlow",
"RestApiSyncFlow",
"StepFunctionsSyncFlow",
"ZipFunctionSyncFlow",
"InfraSyncExecute",
"SkipInfraSyncExecute",
]
_WORKFLOWS = [f"{config.language}-{config.dependency_manager}" for config in ALL_CONFIGS]
_HOOK_CONFIGURATIONS = [
"TerraformPlanFile",
]
_event_values = { # Contains allowable values for Events
EventName.USED_FEATURE: [event.value for event in UsedFeature],
EventName.BUILD_FUNCTION_RUNTIME: INIT_RUNTIMES,
EventName.SYNC_USED: [
"Start",
"End",
],
EventName.SYNC_FLOW_START: _SYNC_FLOWS,
EventName.SYNC_FLOW_END: _SYNC_FLOWS,
EventName.BUILD_WORKFLOW_USED: _WORKFLOWS,
EventName.CONFIG_FILE_EXTENSION: list(FILE_MANAGER_MAPPER.keys()),
EventName.HOOK_CONFIGURATIONS_USED: _HOOK_CONFIGURATIONS,
EventName.REMOTE_INVOKE_EVENT_TYPE: list(RemoteInvokeEventType.get_possible_values()),
}
@staticmethod
def get_accepted_values(event_name: EventName) -> List[str]:
"""Get all acceptable values for a given Event name."""
if event_name not in EventType._event_values:
return []
return EventType._event_values[event_name]
class Event:
"""Class to represent Events that occur in SAM CLI."""
event_name: EventName
event_value: str # Validated by EventType.get_accepted_values to never be an arbitrary string
thread_id: Optional[UUID] # The thread ID; used to group Events from the same command run
time_stamp: str
exception_name: Optional[str]
def __init__(
self, event_name: str, event_value: str, thread_id: Optional[UUID] = None, exception_name: Optional[str] = None
):
Event._verify_event(event_name, event_value)
self.event_name = EventName(event_name)
self.event_value = event_value
if not thread_id:
thread_id = uuid4()
self.thread_id = thread_id
self.time_stamp = str(datetime.utcnow())[:-3] # format microseconds from 6 -> 3 figures to allow SQL casting
self.exception_name = exception_name
def __eq__(self, other):
return (
self.event_name == other.event_name
and self.event_value == other.event_value
and self.exception_name == other.exception_name
)
def __repr__(self):
return (
f"Event(event_name={self.event_name.value}, "
f"event_value={self.event_value}, "
f"thread_id={self.thread_id.hex}, "
f"time_stamp={self.time_stamp})",
f"exception_name={self.exception_name})",
)
def to_json(self):
return {
"event_name": self.event_name.value,
"event_value": self.event_value,
"thread_id": self.thread_id.hex,
"time_stamp": self.time_stamp,
"exception_name": self.exception_name,
}
@staticmethod
def _verify_event(event_name: str, event_value: str) -> None:
"""Raise an EventCreationError if either the event name or value is not valid."""
if event_name not in Event._get_event_names():
raise EventCreationError(f"Event '{event_name}' does not exist.")
if event_value not in EventType.get_accepted_values(EventName(event_name)):
raise EventCreationError(f"Event '{event_name}' does not accept value '{event_value}'.")
@staticmethod
def _get_event_names() -> List[str]:
"""Retrieves a list of all valid event names."""
return [event.value for event in EventName]
class EventTracker:
"""Class to track and recreate Events as they occur."""
_events: List[Event] = []
_event_lock = threading.Lock()
_session_id: Optional[str] = None
_command_name: Optional[str] = None
MAX_EVENTS: int = 50 # Maximum number of events to store before sending
@staticmethod
def track_event(
event_name: str,
event_value: str,
session_id: Optional[str] = None,
thread_id: Optional[UUID] = None,
exception_name: Optional[str] = None,
):
"""Method to track an event where and when it occurs.
Place this method in the codepath of the event that you would
like to track. For instance, if you would like to track when
FeatureX is used, append this method to the end of that function.
Parameters
----------
event_name: str
The name of the Event. Must be a valid EventName value, or an
EventCreationError will be thrown.
event_value: str
The value of the Event. Must be a valid EventType value for the
passed event_name, or an EventCreationError will be thrown.
session_id: Optional[str]
The session ID to set to link back to the original command run
thread_id: Optional[UUID]
The thread ID of the Event to track, as a UUID.
exception_name: Optional[str]
The name of the exception that this event encountered when tracking a feature
Examples
--------
>>> def feature_x(...):
# do things
EventTracker.track_event("UsedFeature", "FeatureX")
>>> def feature_y(...) -> Any:
# do things
EventTracker.track_event("UsedFeature", "FeatureY")
return some_value
"""
if session_id:
EventTracker._session_id = session_id
try:
should_send: bool = False
# Validate the thread ID
if not thread_id: # Passed value is not a UUID or None
thread_id = uuid4()
with EventTracker._event_lock:
EventTracker._events.append(
Event(event_name, event_value, thread_id=thread_id, exception_name=exception_name)
)
# Get properties from the click context (needed for multithreading sending)
EventTracker._set_context_property("_session_id", "session_id")
EventTracker._set_context_property("_command_name", "command_path")
if len(EventTracker._events) >= EventTracker.MAX_EVENTS:
should_send = True
if should_send:
EventTracker.send_events()
except EventCreationError as e:
LOG.debug("Error occurred while trying to track an event: %s", e)
@staticmethod
def get_tracked_events() -> List[Event]:
"""Retrieve a list of all currently tracked Events."""
with EventTracker._event_lock:
return EventTracker._events
@staticmethod
def clear_trackers():
"""Clear the current list of tracked Events before the next session."""
with EventTracker._event_lock:
EventTracker._events = []
@staticmethod
def send_events() -> threading.Thread:
"""Call a thread to send the current list of Events via Telemetry."""
send_thread = threading.Thread(target=EventTracker._send_events_in_thread)
send_thread.start()
return send_thread
@staticmethod
def _set_context_property(event_prop: str, context_prop: str) -> None:
"""
Set a click context property in the event so that it is emitted when the metric is sent.
This is required since the event is sent in a separate thread and no longer has access
to the click context that the command was initially called with. As a workaround, we set
the property here first so that it's available when calling the metrics endpoint.
Parameters
----------
event_prop: str
Property name to be stored in the event and consumed when emitting the metric
context_prop: str
Property name for the target property from the context object
"""
if not getattr(EventTracker, event_prop):
try:
ctx = Context.get_current_context()
if ctx:
setattr(EventTracker, event_prop, getattr(ctx, context_prop))
except RuntimeError:
LOG.debug("EventTracker: Unable to obtain %s", context_prop)
@staticmethod
def _send_events_in_thread():
"""Send the current list of Events via Telemetry."""
from samcli.lib.telemetry.metric import Metric # pylint: disable=cyclic-import
msa = {}
with EventTracker._event_lock:
if not EventTracker._events: # Don't do anything if there are no events to send
return
msa["events"] = [e.to_json() for e in EventTracker._events]
EventTracker._events = [] # Manual clear_trackers() since we're within the lock
telemetry = Telemetry()
metric = Metric("events")
metric.add_data("sessionId", EventTracker._session_id)
metric.add_data("commandName", EventTracker._command_name)
metric.add_data("metricSpecificAttributes", msa)
telemetry.emit(metric)
def track_long_event(
start_event_name: str,
start_event_value: str,
end_event_name: str,
end_event_value: str,
thread_id: Optional[UUID] = None,
):
"""Decorator for tracking events that occur at start and end of a function.
The decorator tracks two Events total, where the first Event occurs
at the start of the decorated function's execution (prior to its
first line) and the second Event occurs after the function has ended
(after the final line of the function has executed).
If this decorator is being placed in a function that also contains the
`track_command` decorator, ensure that this decorator is placed BELOW
`track_command`. Otherwise, the current list of Events will be sent
before the end_event will be added, resulting in an additional 'events'
metric with only that single Event.
Parameters
----------
start_event_name: str
The name of the Event that is executed at the start of the
decorated function's execution. Must be a valid EventName
value or the decorator will not run.
start_event_value: str
The value of the Event that is executed at the start of the
decorated function's execution. Must be a valid EventType
value for the passed `start_event_name` or the decorator
will not run.
end_event_name: str
The name of the Event that is executed at the end of the
decorated function's execution. Must be a valid EventName
value or the decorator will not run.
end_event_value: str
The value of the Event that is executed at the end of the
decorated function's execution. Must be a valid EventType
value for the passed `end_event_name` or the decorator
will not run.
thread_id: Optional[UUID]
The thread ID of the Events to track, as a UUID.
Examples
--------
>>> @track_long_event("FuncStart", "Func1", "FuncEnd", "Func1")
def func1(...):
# do things
>>> @track_long_event("FuncStart", "Func2", "FuncEnd", "Func2")
def func2(...):
# do things
"""
should_track = True
try:
# Check that passed values are valid Events
Event(start_event_name, start_event_value)
Event(end_event_name, end_event_value)
# Validate the thread ID
if not thread_id: # Passed value is not a UUID or None
thread_id = uuid4()
except EventCreationError as e:
LOG.debug("Error occurred while trying to track an event: %s\nDecorator not run.", e)
should_track = False
def decorator_for_events(func):
"""The actual decorator"""
def wrapped(*args, **kwargs):
# Track starting event
if should_track:
EventTracker.track_event(start_event_name, start_event_value, thread_id=thread_id)
exception = None
# Run the function
try:
return_value = func(*args, **kwargs)
except Exception as e:
exception = e
# Track ending event
if should_track:
EventTracker.track_event(end_event_name, end_event_value, thread_id=thread_id)
EventTracker.send_events() # Ensure Events are sent at the end of execution
if exception:
raise exception
return return_value
return wrapped
return decorator_for_events
class EventCreationError(Exception):
"""Exception called when an Event is not properly created."""