def _telemetry_emitter()

in src/sagemaker/telemetry/telemetry_logging.py [0:0]


def _telemetry_emitter(feature: str, func_name: str):
    """Telemetry Emitter

    Decorator to emit telemetry logs for SageMaker Python SDK functions. This class needs
    sagemaker_session object as a member. Default session object is a pysdk v2 Session object
    in this repo. When collecting telemetry for classes using sagemaker-core Session object,
    we should be aware of its differences, such as sagemaker_session.sagemaker_config does not
    exist in new Session class.
    """

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            sagemaker_session = None
            if len(args) > 0 and hasattr(args[0], "sagemaker_session"):
                # Get the sagemaker_session from the instance method args
                sagemaker_session = args[0].sagemaker_session
            elif feature == Feature.REMOTE_FUNCTION:
                # Get the sagemaker_session from the function keyword arguments for remote function
                sagemaker_session = kwargs.get(
                    "sagemaker_session", _get_default_sagemaker_session()
                )

            if sagemaker_session:
                logger.debug("sagemaker_session found, preparing to emit telemetry...")
                logger.info(TELEMETRY_OPT_OUT_MESSAGING)
                response = None
                caught_ex = None
                studio_app_type = process_studio_metadata_file()

                # Check if telemetry is opted out
                telemetry_opt_out_flag = resolve_value_from_config(
                    direct_input=None,
                    config_path=TELEMETRY_OPT_OUT_PATH,
                    default_value=False,
                    sagemaker_session=sagemaker_session,
                )
                logger.debug("TelemetryOptOut flag is set to: %s", telemetry_opt_out_flag)

                # Construct the feature list to track feature combinations
                feature_list: List[int] = [FEATURE_TO_CODE[str(feature)]]

                if (
                    hasattr(sagemaker_session, "sagemaker_config")
                    and sagemaker_session.sagemaker_config
                    and feature != Feature.SDK_DEFAULTS
                ):
                    feature_list.append(FEATURE_TO_CODE[str(Feature.SDK_DEFAULTS)])

                if (
                    hasattr(sagemaker_session, "local_mode")
                    and sagemaker_session.local_mode
                    and feature != Feature.LOCAL_MODE
                ):
                    feature_list.append(FEATURE_TO_CODE[str(Feature.LOCAL_MODE)])

                # Construct the extra info to track platform and environment usage metadata
                extra = (
                    f"{func_name}"
                    f"&x-sdkVersion={SDK_VERSION}"
                    f"&x-env={PYTHON_VERSION}"
                    f"&x-sys={OS_NAME_VERSION}"
                    f"&x-platform={studio_app_type}"
                )

                # Add endpoint ARN to the extra info if available
                if hasattr(sagemaker_session, "endpoint_arn") and sagemaker_session.endpoint_arn:
                    extra += f"&x-endpointArn={sagemaker_session.endpoint_arn}"

                start_timer = perf_counter()
                try:
                    # Call the original function
                    response = func(*args, **kwargs)
                    stop_timer = perf_counter()
                    elapsed = stop_timer - start_timer
                    extra += f"&x-latency={round(elapsed, 2)}"
                    if not telemetry_opt_out_flag:
                        _send_telemetry_request(
                            STATUS_TO_CODE[str(Status.SUCCESS)],
                            feature_list,
                            sagemaker_session,
                            None,
                            None,
                            extra,
                        )
                except Exception as e:  # pylint: disable=W0703
                    stop_timer = perf_counter()
                    elapsed = stop_timer - start_timer
                    extra += f"&x-latency={round(elapsed, 2)}"
                    if not telemetry_opt_out_flag:
                        _send_telemetry_request(
                            STATUS_TO_CODE[str(Status.FAILURE)],
                            feature_list,
                            sagemaker_session,
                            str(e),
                            e.__class__.__name__,
                            extra,
                        )
                    caught_ex = e
                finally:
                    if caught_ex:
                        raise caught_ex
                    return response  # pylint: disable=W0150
            else:
                logger.debug(
                    "Unable to send telemetry for function %s. "
                    "sagemaker_session is not provided or not valid.",
                    func_name,
                )
                return func(*args, **kwargs)

        return wrapper

    return decorator