images/airflow/2.9.2/python/mwaa/logging/utils.py (46 lines of code) (raw):
"""A module containing various utility functions related to logging."""
# Python imports
from functools import wraps
from typing import Callable, Any, TypeVar
import time
def parse_arn(log_group_arn: str):
"""
Extract the log group and region name from a log group ARN.
:param log_group_arn: The ARN of the log group.
:return: A tuple containing the log group name and the region name.
"""
try:
split_arn = log_group_arn.split(":")
log_group = split_arn[6]
region_name = split_arn[3] if split_arn[3] else None
return log_group, region_name
except Exception as ex:
raise RuntimeError(f"Invalid log group ARN: {log_group_arn}") from ex
F = TypeVar("F", bound=Callable[..., Any])
def throttle(
seconds: float,
log_throttling_msg: bool = False,
instance_level_throttling: bool = False,
) -> Callable[[F], F]:
"""
Add a throttling functionality to a function.
This decorator limits the rate at which a function can be called. If the function
is called more than once within the specified number of seconds, it will be
throttled and will not execute again until the time limit has passed.
IMPORTANT NOTE: If you apply this decorator to a class method, the throttling will
happen at the class level by default. If you need to apply a throttling at the
instance level, i.e. calling the same method on multiple instances will count
as one call per each instance, then you must set the `instance_level_throttling'
parameter to True.
:param seconds (float): The number of seconds to wait between function calls.
:param log_throttling_msg (bool): If true, a message will be printed in case the
call to the function gets throttled. If false, the function will be silently
throttled. You probably want to set this to True if a function is not called
frequently, and thus will not result in log pollution. However, for a functions
that gets called fairly regularly, it is better to set this to False.
:param instance_level_throttling (bool): If true, throttling will be applied per
instance method. If false, throttling will be applied globally for the function.
:return A decorated function that will enforce the throttling.
"""
def decorator(func: F) -> F:
last_called_global = [0.0]
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
if instance_level_throttling:
self = args[0]
if not hasattr(self, "_last_called"):
setattr(self, "_last_called", {})
last_called = self._last_called.get(func.__name__, 0.0)
else:
last_called = last_called_global[0]
current_time = time.time()
elapsed = current_time - last_called
if elapsed < seconds:
wait_time = seconds - elapsed
if log_throttling_msg:
print(
f"Throttling {func.__name__} for {wait_time:.2f} more seconds."
)
return None
else:
if instance_level_throttling:
self = args[0]
self._last_called[func.__name__] = current_time
else:
last_called_global[0] = current_time
return func(*args, **kwargs)
return wrapper # type: ignore
return decorator