def get_conn()

in providers/google/src/airflow/providers/google/cloud/hooks/compute_ssh.py [0:0]


    def get_conn(self) -> paramiko.SSHClient:
        """Return SSH connection."""
        self._load_connection_config()
        if not self.project_id:
            self.project_id = self._compute_hook.project_id

        missing_fields = [k for k in ["instance_name", "zone", "project_id"] if not getattr(self, k)]
        if not self.instance_name or not self.zone or not self.project_id:
            raise AirflowException(
                f"Required parameters are missing: {missing_fields}. These parameters be passed either as "
                "keyword parameter or as extra field in Airflow connection definition. Both are not set!"
            )

        self.log.info(
            "Connecting to instance: instance_name=%s, user=%s, zone=%s, "
            "use_internal_ip=%s, use_iap_tunnel=%s, use_os_login=%s",
            self.instance_name,
            self.user,
            self.zone,
            self.use_internal_ip,
            self.use_iap_tunnel,
            self.use_oslogin,
        )
        if not self.hostname:
            hostname = self._compute_hook.get_instance_address(
                zone=self.zone,
                resource_id=self.instance_name,
                project_id=self.project_id,
                use_internal_ip=self.use_internal_ip or self.use_iap_tunnel,
            )
        else:
            hostname = self.hostname

        privkey, pubkey = self._generate_ssh_key(self.user)

        max_delay = 10
        sshclient = None
        for retry in range(self.max_retries + 1):
            try:
                if self.use_oslogin:
                    user = self._authorize_os_login(pubkey)
                else:
                    user = self.user
                    self._authorize_compute_engine_instance_metadata(pubkey)
                proxy_command = None
                if self.use_iap_tunnel:
                    proxy_command_args = [
                        "gcloud",
                        "compute",
                        "start-iap-tunnel",
                        str(self.instance_name),
                        "22",
                        "--listen-on-stdin",
                        f"--project={self.project_id}",
                        f"--zone={self.zone}",
                        "--verbosity=warning",
                    ]
                    if self.impersonation_chain:
                        proxy_command_args.append(f"--impersonate-service-account={self.impersonation_chain}")
                    proxy_command = " ".join(shlex.quote(arg) for arg in proxy_command_args)
                sshclient = self._connect_to_instance(user, hostname, privkey, proxy_command)
                break
            except (HttpError, AirflowException, SSHException) as exc:
                if (isinstance(exc, HttpError) and exc.resp.status == 412) or (
                    isinstance(exc, AirflowException) and "412 PRECONDITION FAILED" in str(exc)
                ):
                    self.log.info("Error occurred when trying to update instance metadata: %s", exc)
                elif isinstance(exc, SSHException):
                    self.log.info("Error occurred when establishing SSH connection using Paramiko: %s", exc)
                else:
                    raise
                if retry == self.max_retries:
                    raise AirflowException("Maximum retries exceeded. Aborting operation.")
                delay = random.randint(0, max_delay)
                self.log.info("Failed establish SSH connection, waiting %s seconds to retry...", delay)
                time.sleep(delay)
        if not sshclient:
            raise AirflowException("Unable to establish SSH connection.")
        return sshclient