in task-sdk/src/airflow/sdk/definitions/connection.py [0:0]
def get_uri(self) -> str:
"""Generate and return connection in URI format."""
from urllib.parse import parse_qsl, quote, urlencode
if self.conn_type and "_" in self.conn_type:
log.warning(
"Connection schemes (type: %s) shall not contain '_' according to RFC3986.",
self.conn_type,
)
if self.conn_type:
uri = f"{self.conn_type.lower().replace('_', '-')}://"
else:
uri = "//"
if self.host and "://" in self.host:
protocol, host = self.host.split("://", 1)
else:
protocol, host = None, self.host or ""
if protocol:
uri += f"{protocol}://"
authority_block = ""
if self.login is not None:
authority_block += quote(self.login, safe="")
if self.password is not None:
authority_block += ":" + quote(self.password, safe="")
if authority_block > "":
authority_block += "@"
uri += authority_block
host_block = ""
if host != "":
host_block += quote(host, safe="")
if self.port:
if host_block == "" and authority_block == "":
host_block += f"@:{self.port}"
else:
host_block += f":{self.port}"
if self.schema:
host_block += f"/{quote(self.schema, safe='')}"
uri += host_block
if self.extra:
try:
query: str | None = urlencode(self.extra_dejson)
except TypeError:
query = None
if query and self.extra_dejson == dict(parse_qsl(query, keep_blank_values=True)):
uri += ("?" if self.schema else "/?") + query
else:
uri += ("?" if self.schema else "/?") + urlencode({self.EXTRA_KEY: self.extra})
return uri