def _get_topology()

in aws_advanced_python_wrapper/host_list_provider.py [0:0]


    def _get_topology(self, conn: Optional[Connection], force_update: bool = False) -> FetchTopologyResult:
        """
        Get topology information for the database cluster. This method executes a database query if `force_update` is True,
        if there is no information for the cluster in the cache, or if the cached topology is outdated.
        Otherwise, the cached topology will be returned.

        :param conn: the connection to use to fetch topology information, if necessary.
        :param force_update: set to true to force the driver to query the database for
        up-to-date topology information instead of relying on any cached information.
        :return: a :py:class:`FetchTopologyResult` object containing the topology information
        and whether the information came from the cache or a database query.
        If the database was queried and the results did not include a writer instance, the topology information tuple will be empty.
        """
        self._initialize()

        suggested_primary_cluster_id = RdsHostListProvider._cluster_ids_to_update.get(self._cluster_id)
        if suggested_primary_cluster_id and self._cluster_id != suggested_primary_cluster_id:
            self._cluster_id = suggested_primary_cluster_id
            self._is_primary_cluster_id = True

        cached_hosts = RdsHostListProvider._topology_cache.get(self._cluster_id)
        if not cached_hosts or force_update:
            if not conn:
                # Cannot fetch topology without a connection
                # Return the original hosts passed to the connect method
                return RdsHostListProvider.FetchTopologyResult(self._initial_hosts, False)

            try:
                driver_dialect = self._host_list_provider_service.driver_dialect

                query_for_topology_func_with_timeout = preserve_transaction_status_with_timeout(
                    RdsHostListProvider._executor, self._max_timeout, driver_dialect, conn)(self._query_for_topology)
                hosts = query_for_topology_func_with_timeout(conn)
                if hosts is not None and len(hosts) > 0:
                    RdsHostListProvider._topology_cache.put(self._cluster_id, hosts, self._refresh_rate_ns)
                    if self._is_primary_cluster_id and cached_hosts is None:
                        # This cluster_id is primary and a new entry was just created in the cache. When this happens,
                        # we check for non-primary cluster IDs associated with the same cluster so that the topology
                        # info can be shared.
                        self._suggest_cluster_id(hosts)
                    return RdsHostListProvider.FetchTopologyResult(hosts, False)
            except TimeoutError as e:
                raise QueryTimeoutError(Messages.get("RdsHostListProvider.QueryForTopologyTimeout")) from e

        if cached_hosts:
            return RdsHostListProvider.FetchTopologyResult(cached_hosts, True)
        else:
            return RdsHostListProvider.FetchTopologyResult(self._initial_hosts, False)