in aws_advanced_python_wrapper/plugin_service.py [0:0]
def execute(self, target: object, method_name: str, target_driver_func: Callable, *args, **kwargs) -> Any:
plugin_service = self._container.plugin_service
driver_dialect = plugin_service.driver_dialect
conn: Optional[Connection] = driver_dialect.get_connection_from_obj(target)
current_conn: Optional[Connection] = driver_dialect.unwrap_connection(plugin_service.current_connection)
if method_name not in ["Connection.close", "Cursor.close"] and conn is not None and conn != current_conn:
raise AwsWrapperError(Messages.get_formatted("PluginManager.MethodInvokedAgainstOldConnection", target))
if conn is None and method_name in ["Connection.close", "Cursor.close"]:
return
context: TelemetryContext
context = self._telemetry_factory.open_telemetry_context(method_name, TelemetryTraceLevel.TOP_LEVEL)
context.set_attribute("python_call", method_name)
try:
result = self._execute_with_subscribed_plugins(
method_name,
# next_plugin_func is defined later in make_pipeline
lambda plugin, next_plugin_func: plugin.execute(target, method_name, next_plugin_func, *args, **kwargs),
target_driver_func)
context.set_success(True)
return result
except Exception as e:
context.set_success(False)
raise e
finally:
context.close_context()