in fbpcp/decorator/metrics.py [0:0]
def error_counter(metrics_name: str) -> Callable:
def wrap(f: Callable):
@functools.wraps(f)
def wrapper_sync(self: MetricsGetter, *args, **kwargs):
try:
return f(self, *args, **kwargs)
except Exception as err:
if self.has_metrics():
self.get_metrics().count(metrics_name, 1)
raise err
@functools.wraps(f)
async def wrapper_async(self: MetricsGetter, *args, **kwargs):
try:
return await f(self, *args, **kwargs)
except Exception as err:
if self.has_metrics():
self.get_metrics().count(metrics_name, 1)
raise err
return wrapper_async if asyncio.iscoroutinefunction(f) else wrapper_sync
return wrap