in aws_advanced_python_wrapper/host_list_provider.py [0:0]
def _get_topology(self, conn: Optional[Connection], force_update: bool = False) -> FetchTopologyResult:
"""
Get topology information for the database cluster. This method executes a database query if `force_update` is True,
if there is no information for the cluster in the cache, or if the cached topology is outdated.
Otherwise, the cached topology will be returned.
:param conn: the connection to use to fetch topology information, if necessary.
:param force_update: set to true to force the driver to query the database for
up-to-date topology information instead of relying on any cached information.
:return: a :py:class:`FetchTopologyResult` object containing the topology information
and whether the information came from the cache or a database query.
If the database was queried and the results did not include a writer instance, the topology information tuple will be empty.
"""
self._initialize()
suggested_primary_cluster_id = RdsHostListProvider._cluster_ids_to_update.get(self._cluster_id)
if suggested_primary_cluster_id and self._cluster_id != suggested_primary_cluster_id:
self._cluster_id = suggested_primary_cluster_id
self._is_primary_cluster_id = True
cached_hosts = RdsHostListProvider._topology_cache.get(self._cluster_id)
if not cached_hosts or force_update:
if not conn:
# Cannot fetch topology without a connection
# Return the original hosts passed to the connect method
return RdsHostListProvider.FetchTopologyResult(self._initial_hosts, False)
try:
driver_dialect = self._host_list_provider_service.driver_dialect
query_for_topology_func_with_timeout = preserve_transaction_status_with_timeout(
RdsHostListProvider._executor, self._max_timeout, driver_dialect, conn)(self._query_for_topology)
hosts = query_for_topology_func_with_timeout(conn)
if hosts is not None and len(hosts) > 0:
RdsHostListProvider._topology_cache.put(self._cluster_id, hosts, self._refresh_rate_ns)
if self._is_primary_cluster_id and cached_hosts is None:
# This cluster_id is primary and a new entry was just created in the cache. When this happens,
# we check for non-primary cluster IDs associated with the same cluster so that the topology
# info can be shared.
self._suggest_cluster_id(hosts)
return RdsHostListProvider.FetchTopologyResult(hosts, False)
except TimeoutError as e:
raise QueryTimeoutError(Messages.get("RdsHostListProvider.QueryForTopologyTimeout")) from e
if cached_hosts:
return RdsHostListProvider.FetchTopologyResult(cached_hosts, True)
else:
return RdsHostListProvider.FetchTopologyResult(self._initial_hosts, False)