in aws_advanced_python_wrapper/custom_endpoint_plugin.py [0:0]
def _run(self):
logger.debug("CustomEndpointMonitor.StartingMonitor", self._custom_endpoint_host_info.host)
try:
while not self._stop_event.is_set():
try:
start_ns = perf_counter_ns()
response = self._client.describe_db_cluster_endpoints(
DBClusterEndpointIdentifier=self._endpoint_id,
Filters=[
{
"Name": "db-cluster-endpoint-type",
"Values": ["custom"]
}
]
)
endpoints = response["DBClusterEndpoints"]
if len(endpoints) != 1:
endpoint_hostnames = [endpoint["Endpoint"] for endpoint in endpoints]
logger.warning(
"CustomEndpointMonitor.UnexpectedNumberOfEndpoints",
self._endpoint_id,
self._region,
len(endpoints),
endpoint_hostnames)
sleep(self._refresh_rate_ns / 1_000_000_000)
continue
endpoint_info = CustomEndpointInfo.from_db_cluster_endpoint(endpoints[0])
cached_info = \
CustomEndpointMonitor._custom_endpoint_info_cache.get(self._custom_endpoint_host_info.host)
if cached_info is not None and cached_info == endpoint_info:
elapsed_time = perf_counter_ns() - start_ns
sleep_duration = max(0, self._refresh_rate_ns - elapsed_time)
sleep(sleep_duration / 1_000_000_000)
continue
logger.debug(
"CustomEndpointMonitor.DetectedChangeInCustomEndpointInfo",
self._custom_endpoint_host_info.host, endpoint_info)
# The custom endpoint info has changed, so we need to update the set of allowed/blocked hosts.
hosts = AllowedAndBlockedHosts(endpoint_info.static_members, endpoint_info.excluded_members)
self._plugin_service.allowed_and_blocked_hosts = hosts
CustomEndpointMonitor._custom_endpoint_info_cache.put(
self._custom_endpoint_host_info.host,
endpoint_info,
CustomEndpointMonitor._CUSTOM_ENDPOINT_INFO_EXPIRATION_NS)
self._info_changed_counter.inc()
elapsed_time = perf_counter_ns() - start_ns
sleep_duration = max(0, self._refresh_rate_ns - elapsed_time)
sleep(sleep_duration / 1_000_000_000)
continue
except InterruptedError as e:
raise e
except Exception as e:
# If the exception is not an InterruptedError, log it and continue monitoring.
logger.error("CustomEndpointMonitor.Exception", self._custom_endpoint_host_info.host, e)
except InterruptedError:
logger.info("CustomEndpointMonitor.Interrupted", self._custom_endpoint_host_info.host)
finally:
CustomEndpointMonitor._custom_endpoint_info_cache.remove(self._custom_endpoint_host_info.host)
self._stop_event.set()
self._client.close()
logger.debug("CustomEndpointMonitor.StoppedMonitor", self._custom_endpoint_host_info.host)