in src/sagemaker/telemetry/telemetry_logging.py [0:0]
def _telemetry_emitter(feature: str, func_name: str):
"""Telemetry Emitter
Decorator to emit telemetry logs for SageMaker Python SDK functions. This class needs
sagemaker_session object as a member. Default session object is a pysdk v2 Session object
in this repo. When collecting telemetry for classes using sagemaker-core Session object,
we should be aware of its differences, such as sagemaker_session.sagemaker_config does not
exist in new Session class.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
sagemaker_session = None
if len(args) > 0 and hasattr(args[0], "sagemaker_session"):
# Get the sagemaker_session from the instance method args
sagemaker_session = args[0].sagemaker_session
elif feature == Feature.REMOTE_FUNCTION:
# Get the sagemaker_session from the function keyword arguments for remote function
sagemaker_session = kwargs.get(
"sagemaker_session", _get_default_sagemaker_session()
)
if sagemaker_session:
logger.debug("sagemaker_session found, preparing to emit telemetry...")
logger.info(TELEMETRY_OPT_OUT_MESSAGING)
response = None
caught_ex = None
studio_app_type = process_studio_metadata_file()
# Check if telemetry is opted out
telemetry_opt_out_flag = resolve_value_from_config(
direct_input=None,
config_path=TELEMETRY_OPT_OUT_PATH,
default_value=False,
sagemaker_session=sagemaker_session,
)
logger.debug("TelemetryOptOut flag is set to: %s", telemetry_opt_out_flag)
# Construct the feature list to track feature combinations
feature_list: List[int] = [FEATURE_TO_CODE[str(feature)]]
if (
hasattr(sagemaker_session, "sagemaker_config")
and sagemaker_session.sagemaker_config
and feature != Feature.SDK_DEFAULTS
):
feature_list.append(FEATURE_TO_CODE[str(Feature.SDK_DEFAULTS)])
if (
hasattr(sagemaker_session, "local_mode")
and sagemaker_session.local_mode
and feature != Feature.LOCAL_MODE
):
feature_list.append(FEATURE_TO_CODE[str(Feature.LOCAL_MODE)])
# Construct the extra info to track platform and environment usage metadata
extra = (
f"{func_name}"
f"&x-sdkVersion={SDK_VERSION}"
f"&x-env={PYTHON_VERSION}"
f"&x-sys={OS_NAME_VERSION}"
f"&x-platform={studio_app_type}"
)
# Add endpoint ARN to the extra info if available
if hasattr(sagemaker_session, "endpoint_arn") and sagemaker_session.endpoint_arn:
extra += f"&x-endpointArn={sagemaker_session.endpoint_arn}"
start_timer = perf_counter()
try:
# Call the original function
response = func(*args, **kwargs)
stop_timer = perf_counter()
elapsed = stop_timer - start_timer
extra += f"&x-latency={round(elapsed, 2)}"
if not telemetry_opt_out_flag:
_send_telemetry_request(
STATUS_TO_CODE[str(Status.SUCCESS)],
feature_list,
sagemaker_session,
None,
None,
extra,
)
except Exception as e: # pylint: disable=W0703
stop_timer = perf_counter()
elapsed = stop_timer - start_timer
extra += f"&x-latency={round(elapsed, 2)}"
if not telemetry_opt_out_flag:
_send_telemetry_request(
STATUS_TO_CODE[str(Status.FAILURE)],
feature_list,
sagemaker_session,
str(e),
e.__class__.__name__,
extra,
)
caught_ex = e
finally:
if caught_ex:
raise caught_ex
return response # pylint: disable=W0150
else:
logger.debug(
"Unable to send telemetry for function %s. "
"sagemaker_session is not provided or not valid.",
func_name,
)
return func(*args, **kwargs)
return wrapper
return decorator