in aws_advanced_python_wrapper/host_monitoring_plugin.py [0:0]
def execute(self, target: object, method_name: str, execute_func: Callable, *args: Any, **kwargs: Any) -> Any:
connection = self._plugin_service.current_connection
if connection is None:
raise AwsWrapperError(Messages.get_formatted("HostMonitoringPlugin.ConnectionNone", method_name))
host_info = self._plugin_service.current_host_info
if host_info is None:
raise AwsWrapperError(Messages.get_formatted("HostMonitoringPlugin.HostInfoNoneForMethod", method_name))
is_enabled = WrapperProperties.FAILURE_DETECTION_ENABLED.get_bool(self._props)
if not is_enabled or not self._plugin_service.is_network_bound_method(method_name):
return execute_func()
failure_detection_time_ms = WrapperProperties.FAILURE_DETECTION_TIME_MS.get_int(self._props)
failure_detection_interval = WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.get_int(self._props)
failure_detection_count = WrapperProperties.FAILURE_DETECTION_COUNT.get_int(self._props)
monitor_context = None
result = None
try:
logger.debug("HostMonitoringPlugin.ActivatedMonitoring", method_name)
monitor_context = self._monitor_service.start_monitoring(
connection,
self._get_monitoring_host_info().all_aliases,
self._get_monitoring_host_info(),
self._props,
failure_detection_time_ms,
failure_detection_interval,
failure_detection_count
)
result = execute_func()
finally:
if monitor_context:
with self._lock:
self._monitor_service.stop_monitoring(monitor_context)
if monitor_context.is_host_unavailable():
self._plugin_service.set_availability(
self._get_monitoring_host_info().all_aliases, HostAvailability.UNAVAILABLE)
driver_dialect = self._plugin_service.driver_dialect
if driver_dialect is not None and not driver_dialect.is_closed(connection):
try:
connection.close()
except Exception:
pass
raise AwsWrapperError(
Messages.get_formatted("HostMonitoringPlugin.UnavailableHost", host_info.as_alias()))
logger.debug("HostMonitoringPlugin.MonitoringDeactivated", method_name)
return result