in airflow-core/src/airflow/models/connection.py [0:0]
def get_uri(self) -> str:
"""
Return the connection URI in Airflow format.
The Airflow URI format examples: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#uri-format-example
Note that the URI returned by this method is **not** SQLAlchemy-compatible, if you need a SQLAlchemy-compatible URI, use the :attr:`~airflow.providers.common.sql.hooks.sql.DbApiHook.sqlalchemy_url`
"""
if self.conn_type and "_" in self.conn_type:
self.log.warning(
"Connection schemes (type: %s) shall not contain '_' according to RFC3986.",
self.conn_type,
)
if self.conn_type:
uri = f"{self.conn_type.lower().replace('_', '-')}://"
else:
uri = "//"
if self.host and "://" in self.host:
protocol, host = self.host.split("://", 1)
else:
protocol, host = None, self.host
if protocol:
uri += f"{protocol}://"
authority_block = ""
if self.login is not None:
authority_block += quote(self.login, safe="")
if self.password is not None:
authority_block += ":" + quote(self.password, safe="")
if authority_block > "":
authority_block += "@"
uri += authority_block
host_block = ""
if host:
host_block += quote(host, safe="")
if self.port:
if host_block == "" and authority_block == "":
host_block += f"@:{self.port}"
else:
host_block += f":{self.port}"
if self.schema:
host_block += f"/{quote(self.schema, safe='')}"
uri += host_block
if self.extra:
try:
query: str | None = urlencode(self.extra_dejson)
except TypeError:
query = None
if query and self.extra_dejson == dict(parse_qsl(query, keep_blank_values=True)):
uri += ("?" if self.schema else "/?") + query
else:
uri += ("?" if self.schema else "/?") + urlencode({self.EXTRA_KEY: self.extra})
return uri