def _run()

in aws_advanced_python_wrapper/custom_endpoint_plugin.py [0:0]


    def _run(self):
        logger.debug("CustomEndpointMonitor.StartingMonitor", self._custom_endpoint_host_info.host)

        try:
            while not self._stop_event.is_set():
                try:
                    start_ns = perf_counter_ns()

                    response = self._client.describe_db_cluster_endpoints(
                        DBClusterEndpointIdentifier=self._endpoint_id,
                        Filters=[
                            {
                                "Name": "db-cluster-endpoint-type",
                                "Values": ["custom"]
                            }
                        ]
                    )

                    endpoints = response["DBClusterEndpoints"]
                    if len(endpoints) != 1:
                        endpoint_hostnames = [endpoint["Endpoint"] for endpoint in endpoints]
                        logger.warning(
                            "CustomEndpointMonitor.UnexpectedNumberOfEndpoints",
                            self._endpoint_id,
                            self._region,
                            len(endpoints),
                            endpoint_hostnames)

                        sleep(self._refresh_rate_ns / 1_000_000_000)
                        continue

                    endpoint_info = CustomEndpointInfo.from_db_cluster_endpoint(endpoints[0])
                    cached_info = \
                        CustomEndpointMonitor._custom_endpoint_info_cache.get(self._custom_endpoint_host_info.host)
                    if cached_info is not None and cached_info == endpoint_info:
                        elapsed_time = perf_counter_ns() - start_ns
                        sleep_duration = max(0, self._refresh_rate_ns - elapsed_time)
                        sleep(sleep_duration / 1_000_000_000)
                        continue

                    logger.debug(
                        "CustomEndpointMonitor.DetectedChangeInCustomEndpointInfo",
                        self._custom_endpoint_host_info.host, endpoint_info)

                    # The custom endpoint info has changed, so we need to update the set of allowed/blocked hosts.
                    hosts = AllowedAndBlockedHosts(endpoint_info.static_members, endpoint_info.excluded_members)
                    self._plugin_service.allowed_and_blocked_hosts = hosts
                    CustomEndpointMonitor._custom_endpoint_info_cache.put(
                        self._custom_endpoint_host_info.host,
                        endpoint_info,
                        CustomEndpointMonitor._CUSTOM_ENDPOINT_INFO_EXPIRATION_NS)
                    self._info_changed_counter.inc()

                    elapsed_time = perf_counter_ns() - start_ns
                    sleep_duration = max(0, self._refresh_rate_ns - elapsed_time)
                    sleep(sleep_duration / 1_000_000_000)
                    continue
                except InterruptedError as e:
                    raise e
                except Exception as e:
                    # If the exception is not an InterruptedError, log it and continue monitoring.
                    logger.error("CustomEndpointMonitor.Exception", self._custom_endpoint_host_info.host, e)
        except InterruptedError:
            logger.info("CustomEndpointMonitor.Interrupted", self._custom_endpoint_host_info.host)
        finally:
            CustomEndpointMonitor._custom_endpoint_info_cache.remove(self._custom_endpoint_host_info.host)
            self._stop_event.set()
            self._client.close()
            logger.debug("CustomEndpointMonitor.StoppedMonitor", self._custom_endpoint_host_info.host)