in providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/hooks/winrm.py [0:0]
def get_conn(self):
self.log.debug("Creating WinRM client for conn_id: %s", self.ssh_conn_id)
if self.ssh_conn_id is not None:
conn = self.get_connection(self.ssh_conn_id)
if self.username is None:
self.username = conn.login
if self.password is None:
self.password = conn.password
if self.remote_host is None:
self.remote_host = conn.host
if conn.extra is not None:
extra_options = conn.extra_dejson
if "endpoint" in extra_options:
self.endpoint = str(extra_options["endpoint"])
if "remote_port" in extra_options:
self.remote_port = int(extra_options["remote_port"])
if "transport" in extra_options:
self.transport = str(extra_options["transport"])
if "service" in extra_options:
self.service = str(extra_options["service"])
if "keytab" in extra_options:
self.keytab = str(extra_options["keytab"])
if "ca_trust_path" in extra_options:
self.ca_trust_path = str(extra_options["ca_trust_path"])
if "cert_pem" in extra_options:
self.cert_pem = str(extra_options["cert_pem"])
if "cert_key_pem" in extra_options:
self.cert_key_pem = str(extra_options["cert_key_pem"])
if "server_cert_validation" in extra_options:
self.server_cert_validation = str(extra_options["server_cert_validation"])
if "kerberos_delegation" in extra_options:
self.kerberos_delegation = str(extra_options["kerberos_delegation"]).lower() == "true"
if "read_timeout_sec" in extra_options:
self.read_timeout_sec = int(extra_options["read_timeout_sec"])
if "operation_timeout_sec" in extra_options:
self.operation_timeout_sec = int(extra_options["operation_timeout_sec"])
if "kerberos_hostname_override" in extra_options:
self.kerberos_hostname_override = str(extra_options["kerberos_hostname_override"])
if "message_encryption" in extra_options:
self.message_encryption = str(extra_options["message_encryption"])
if "credssp_disable_tlsv1_2" in extra_options:
self.credssp_disable_tlsv1_2 = (
str(extra_options["credssp_disable_tlsv1_2"]).lower() == "true"
)
if "send_cbt" in extra_options:
self.send_cbt = str(extra_options["send_cbt"]).lower() == "true"
if not self.remote_host:
raise AirflowException("Missing required param: remote_host")
# Auto detecting username values from system
if not self.username:
self.log.debug(
"username to WinRM to host: %s is not specified for connection id"
" %s. Using system's default provided by getpass.getuser()",
self.remote_host,
self.ssh_conn_id,
)
self.username = getuser()
# If endpoint is not set, then build a standard wsman endpoint from host and port.
if not self.endpoint:
self.endpoint = f"http://{self.remote_host}:{self.remote_port}/wsman"
try:
if self.password and self.password.strip():
self.winrm_protocol = Protocol(
endpoint=self.endpoint,
transport=self.transport,
username=self.username,
password=self.password,
service=self.service,
keytab=self.keytab,
ca_trust_path=self.ca_trust_path,
cert_pem=self.cert_pem,
cert_key_pem=self.cert_key_pem,
server_cert_validation=self.server_cert_validation,
kerberos_delegation=self.kerberos_delegation,
read_timeout_sec=self.read_timeout_sec,
operation_timeout_sec=self.operation_timeout_sec,
kerberos_hostname_override=self.kerberos_hostname_override,
message_encryption=self.message_encryption,
credssp_disable_tlsv1_2=self.credssp_disable_tlsv1_2,
send_cbt=self.send_cbt,
)
except Exception as error:
error_msg = f"Error creating connection to host: {self.remote_host}, error: {error}"
self.log.error(error_msg)
raise AirflowException(error_msg)
if not hasattr(self.winrm_protocol, "get_command_output_raw"):
# since pywinrm>=0.5 get_command_output_raw replace _raw_get_command_output
self.winrm_protocol.get_command_output_raw = self.winrm_protocol._raw_get_command_output
return self.winrm_protocol