in providers/google/src/airflow/providers/google/cloud/hooks/compute_ssh.py [0:0]
def get_conn(self) -> paramiko.SSHClient:
"""Return SSH connection."""
self._load_connection_config()
if not self.project_id:
self.project_id = self._compute_hook.project_id
missing_fields = [k for k in ["instance_name", "zone", "project_id"] if not getattr(self, k)]
if not self.instance_name or not self.zone or not self.project_id:
raise AirflowException(
f"Required parameters are missing: {missing_fields}. These parameters be passed either as "
"keyword parameter or as extra field in Airflow connection definition. Both are not set!"
)
self.log.info(
"Connecting to instance: instance_name=%s, user=%s, zone=%s, "
"use_internal_ip=%s, use_iap_tunnel=%s, use_os_login=%s",
self.instance_name,
self.user,
self.zone,
self.use_internal_ip,
self.use_iap_tunnel,
self.use_oslogin,
)
if not self.hostname:
hostname = self._compute_hook.get_instance_address(
zone=self.zone,
resource_id=self.instance_name,
project_id=self.project_id,
use_internal_ip=self.use_internal_ip or self.use_iap_tunnel,
)
else:
hostname = self.hostname
privkey, pubkey = self._generate_ssh_key(self.user)
max_delay = 10
sshclient = None
for retry in range(self.max_retries + 1):
try:
if self.use_oslogin:
user = self._authorize_os_login(pubkey)
else:
user = self.user
self._authorize_compute_engine_instance_metadata(pubkey)
proxy_command = None
if self.use_iap_tunnel:
proxy_command_args = [
"gcloud",
"compute",
"start-iap-tunnel",
str(self.instance_name),
"22",
"--listen-on-stdin",
f"--project={self.project_id}",
f"--zone={self.zone}",
"--verbosity=warning",
]
if self.impersonation_chain:
proxy_command_args.append(f"--impersonate-service-account={self.impersonation_chain}")
proxy_command = " ".join(shlex.quote(arg) for arg in proxy_command_args)
sshclient = self._connect_to_instance(user, hostname, privkey, proxy_command)
break
except (HttpError, AirflowException, SSHException) as exc:
if (isinstance(exc, HttpError) and exc.resp.status == 412) or (
isinstance(exc, AirflowException) and "412 PRECONDITION FAILED" in str(exc)
):
self.log.info("Error occurred when trying to update instance metadata: %s", exc)
elif isinstance(exc, SSHException):
self.log.info("Error occurred when establishing SSH connection using Paramiko: %s", exc)
else:
raise
if retry == self.max_retries:
raise AirflowException("Maximum retries exceeded. Aborting operation.")
delay = random.randint(0, max_delay)
self.log.info("Failed establish SSH connection, waiting %s seconds to retry...", delay)
time.sleep(delay)
if not sshclient:
raise AirflowException("Unable to establish SSH connection.")
return sshclient