def get_uri()

in task-sdk/src/airflow/sdk/definitions/connection.py [0:0]


    def get_uri(self) -> str:
        """Generate and return connection in URI format."""
        from urllib.parse import parse_qsl, quote, urlencode

        if self.conn_type and "_" in self.conn_type:
            log.warning(
                "Connection schemes (type: %s) shall not contain '_' according to RFC3986.",
                self.conn_type,
            )
        if self.conn_type:
            uri = f"{self.conn_type.lower().replace('_', '-')}://"
        else:
            uri = "//"

        if self.host and "://" in self.host:
            protocol, host = self.host.split("://", 1)
        else:
            protocol, host = None, self.host or ""
        if protocol:
            uri += f"{protocol}://"

        authority_block = ""
        if self.login is not None:
            authority_block += quote(self.login, safe="")
        if self.password is not None:
            authority_block += ":" + quote(self.password, safe="")
        if authority_block > "":
            authority_block += "@"
            uri += authority_block

        host_block = ""
        if host != "":
            host_block += quote(host, safe="")
        if self.port:
            if host_block == "" and authority_block == "":
                host_block += f"@:{self.port}"
            else:
                host_block += f":{self.port}"
        if self.schema:
            host_block += f"/{quote(self.schema, safe='')}"
        uri += host_block

        if self.extra:
            try:
                query: str | None = urlencode(self.extra_dejson)
            except TypeError:
                query = None
            if query and self.extra_dejson == dict(parse_qsl(query, keep_blank_values=True)):
                uri += ("?" if self.schema else "/?") + query
            else:
                uri += ("?" if self.schema else "/?") + urlencode({self.EXTRA_KEY: self.extra})

        return uri