in mysqlx-connector-python/lib/mysqlx/connection.py [0:0]
def _parse_connection_uri(uri: str) -> Dict[str, Any]:
"""Parses the connection string and returns a dictionary with the
connection settings.
Args:
uri: mysqlx URI scheme to connect to a MySQL server/farm.
Returns:
Returns a dict with parsed values of credentials and address of the
MySQL server/farm.
Raises:
:class:`mysqlx.InterfaceError`: If contains a invalid option.
"""
settings: Dict[str, Any] = {"schema": ""}
match = _URI_SCHEME_RE.match(uri)
scheme, uri = match.groups() if match else ("mysqlx", uri)
if scheme not in ("mysqlx", "mysqlx+srv"):
raise InterfaceError(f"Scheme '{scheme}' is not valid")
if scheme == "mysqlx+srv":
settings["dns-srv"] = True
userinfo, tmp = uri.partition("@")[::2]
host, query_str = tmp.partition("?")[::2]
pos = host.rfind("/")
if host[pos:].find(")") == -1 and pos > 0:
host, settings["schema"] = host.rsplit("/", 1)
host = host.strip("()")
if not host or not userinfo or ":" not in userinfo:
raise InterfaceError(f"Malformed URI '{uri}'")
user, password = userinfo.split(":", 1)
settings["user"], settings["password"] = unquote(user), unquote(password)
if host.startswith(("/", "..", ".")):
settings["socket"] = unquote(host)
elif host.startswith("\\."):
raise InterfaceError("Windows Pipe is not supported")
else:
settings.update(_parse_address_list(host))
invalid_options = ("user", "password", "dns-srv")
for key, val in parse_qsl(query_str, True):
opt = key.replace("_", "-").lower()
if opt in invalid_options:
raise InterfaceError(f"Invalid option: '{key}'")
if opt in _SSL_OPTS:
settings[opt] = unquote(val.strip("()"))
else:
val_str = val.lower()
if val_str in ("1", "true"):
settings[opt] = True
elif val_str in ("0", "false"):
settings[opt] = False
else:
settings[opt] = val_str
return settings