in google/cloud/alloydb/connector/static.py [0:0]
def __init__(self, instance_uri: str, static_conn_info: io.TextIOBase) -> None:
"""
Initializes a StaticConnectionInfoCache instance.
Args:
instance_uri (str): The AlloyDB instance's connection URI.
static_conn_info (io.TextIOBase): The static connection info JSON.
"""
static_info = json.load(static_conn_info)
ca_cert = static_info[instance_uri]["caCert"]
cert_chain = static_info[instance_uri]["pemCertificateChain"]
dns = ""
if static_info[instance_uri]["pscInstanceConfig"]:
dns = static_info[instance_uri]["pscInstanceConfig"]["pscDnsName"].rstrip(
"."
)
ip_addrs = {
"PRIVATE": static_info[instance_uri]["ipAddress"],
"PUBLIC": static_info[instance_uri]["publicIpAddress"],
"PSC": dns,
}
expiration = datetime.now(timezone.utc) + timedelta(hours=1)
priv_key = static_info["privateKey"]
priv_key_bytes = serialization.load_pem_private_key(
priv_key.encode("UTF-8"), password=None
)
self._info = ConnectionInfo(
cert_chain, ca_cert, priv_key_bytes, ip_addrs, expiration
)