def get_conn()

in providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/hooks/winrm.py [0:0]


    def get_conn(self):
        self.log.debug("Creating WinRM client for conn_id: %s", self.ssh_conn_id)
        if self.ssh_conn_id is not None:
            conn = self.get_connection(self.ssh_conn_id)

            if self.username is None:
                self.username = conn.login
            if self.password is None:
                self.password = conn.password
            if self.remote_host is None:
                self.remote_host = conn.host

            if conn.extra is not None:
                extra_options = conn.extra_dejson

                if "endpoint" in extra_options:
                    self.endpoint = str(extra_options["endpoint"])
                if "remote_port" in extra_options:
                    self.remote_port = int(extra_options["remote_port"])
                if "transport" in extra_options:
                    self.transport = str(extra_options["transport"])
                if "service" in extra_options:
                    self.service = str(extra_options["service"])
                if "keytab" in extra_options:
                    self.keytab = str(extra_options["keytab"])
                if "ca_trust_path" in extra_options:
                    self.ca_trust_path = str(extra_options["ca_trust_path"])
                if "cert_pem" in extra_options:
                    self.cert_pem = str(extra_options["cert_pem"])
                if "cert_key_pem" in extra_options:
                    self.cert_key_pem = str(extra_options["cert_key_pem"])
                if "server_cert_validation" in extra_options:
                    self.server_cert_validation = str(extra_options["server_cert_validation"])
                if "kerberos_delegation" in extra_options:
                    self.kerberos_delegation = str(extra_options["kerberos_delegation"]).lower() == "true"
                if "read_timeout_sec" in extra_options:
                    self.read_timeout_sec = int(extra_options["read_timeout_sec"])
                if "operation_timeout_sec" in extra_options:
                    self.operation_timeout_sec = int(extra_options["operation_timeout_sec"])
                if "kerberos_hostname_override" in extra_options:
                    self.kerberos_hostname_override = str(extra_options["kerberos_hostname_override"])
                if "message_encryption" in extra_options:
                    self.message_encryption = str(extra_options["message_encryption"])
                if "credssp_disable_tlsv1_2" in extra_options:
                    self.credssp_disable_tlsv1_2 = (
                        str(extra_options["credssp_disable_tlsv1_2"]).lower() == "true"
                    )
                if "send_cbt" in extra_options:
                    self.send_cbt = str(extra_options["send_cbt"]).lower() == "true"

        if not self.remote_host:
            raise AirflowException("Missing required param: remote_host")

        # Auto detecting username values from system
        if not self.username:
            self.log.debug(
                "username to WinRM to host: %s is not specified for connection id"
                " %s. Using system's default provided by getpass.getuser()",
                self.remote_host,
                self.ssh_conn_id,
            )
            self.username = getuser()

        # If endpoint is not set, then build a standard wsman endpoint from host and port.
        if not self.endpoint:
            self.endpoint = f"http://{self.remote_host}:{self.remote_port}/wsman"

        try:
            if self.password and self.password.strip():
                self.winrm_protocol = Protocol(
                    endpoint=self.endpoint,
                    transport=self.transport,
                    username=self.username,
                    password=self.password,
                    service=self.service,
                    keytab=self.keytab,
                    ca_trust_path=self.ca_trust_path,
                    cert_pem=self.cert_pem,
                    cert_key_pem=self.cert_key_pem,
                    server_cert_validation=self.server_cert_validation,
                    kerberos_delegation=self.kerberos_delegation,
                    read_timeout_sec=self.read_timeout_sec,
                    operation_timeout_sec=self.operation_timeout_sec,
                    kerberos_hostname_override=self.kerberos_hostname_override,
                    message_encryption=self.message_encryption,
                    credssp_disable_tlsv1_2=self.credssp_disable_tlsv1_2,
                    send_cbt=self.send_cbt,
                )

        except Exception as error:
            error_msg = f"Error creating connection to host: {self.remote_host}, error: {error}"
            self.log.error(error_msg)
            raise AirflowException(error_msg)

        if not hasattr(self.winrm_protocol, "get_command_output_raw"):
            # since pywinrm>=0.5 get_command_output_raw replace _raw_get_command_output
            self.winrm_protocol.get_command_output_raw = self.winrm_protocol._raw_get_command_output

        return self.winrm_protocol