in aws_advanced_python_wrapper/host_list_provider.py [0:0]
def _initialize(self):
if self._is_initialized:
return
with self._lock:
if self._is_initialized:
return
host_availability_strategy = create_host_availability_strategy(self._props)
self._initial_host_info: HostInfo = HostInfo(
host=self._props.get("host"),
port=self._props.get("port", HostInfo.NO_PORT),
host_availability_strategy=host_availability_strategy)
self._initial_hosts: Tuple[HostInfo, ...] = (self._initial_host_info,)
self._host_list_provider_service.initial_connection_host_info = self._initial_host_info
host_pattern = WrapperProperties.CLUSTER_INSTANCE_HOST_PATTERN.get(self._props)
if host_pattern:
if host_pattern.find(":") > -1:
host_pattern, port = host_pattern.split(":")
else:
port = HostInfo.NO_PORT
self._cluster_instance_template = HostInfo(
host=host_pattern,
port=port,
host_availability_strategy=host_availability_strategy)
else:
self._cluster_instance_template = HostInfo(
host=self._rds_utils.get_rds_instance_host_pattern(self._initial_host_info.host),
host_id=self._initial_host_info.host_id,
port=self._initial_host_info.port,
host_availability_strategy=host_availability_strategy)
self._validate_host_pattern(self._cluster_instance_template.host)
self._rds_url_type: RdsUrlType = self._rds_utils.identify_rds_type(self._initial_host_info.host)
cluster_id = WrapperProperties.CLUSTER_ID.get(self._props)
if cluster_id:
self._cluster_id = cluster_id
elif self._rds_url_type == RdsUrlType.RDS_PROXY:
self._cluster_id = self._initial_host_info.url
elif self._rds_url_type.is_rds:
cluster_id_suggestion = self._get_suggested_cluster_id(self._initial_host_info.url)
if cluster_id_suggestion and cluster_id_suggestion.cluster_id:
# The initial URL matches an entry in the topology cache for an existing cluster ID.
# Update this cluster ID to match the existing one so that topology info can be shared.
self._cluster_id = cluster_id_suggestion.cluster_id
self._is_primary_cluster_id = cluster_id_suggestion.is_primary_cluster_id
else:
cluster_url = self._rds_utils.get_rds_cluster_host_url(self._initial_host_info.host)
if cluster_url is not None:
self._cluster_id = f"{cluster_url}:{self._cluster_instance_template.port}" \
if self._cluster_instance_template.is_port_specified() else cluster_url
self._is_primary_cluster_id = True
self._is_primary_cluster_id_cache.put(self._cluster_id, True,
self._suggested_cluster_id_refresh_ns)
self._is_initialized = True