in ad-joining/register-computer/ad/domain.py [0:0]
def connect(domain_controller, base_dn, user, password, use_ldaps=False, certificate_data=None):
logging.info("Connecting to LDAP endpoint of '%s' as '%s'" % (domain_controller, user))
if use_ldaps:
logging.info("Using LDAP over SSL/TLS")
tls_configuration = Tls(ssl.create_default_context(ssl.Purpose.SERVER_AUTH), validate=ssl.CERT_REQUIRED)
if certificate_data is not None:
logging.debug("Using CA certificate data from Secret Manager")
tls_configuration.ca_certs_data = certificate_data
server = ldap3.Server(domain_controller, port=636, connect_timeout=5, use_ssl=True, tls=tls_configuration)
else:
server = ldap3.Server(domain_controller, port=389, connect_timeout=5, use_ssl=False)
connection = ldap3.Connection(server, user=user, password=password, authentication=ldap3.NTLM, raise_exceptions=True, receive_timeout=20)
try:
if connection.bind():
return ActiveDirectoryConnection(domain_controller, connection, base_dn)
except LDAPStrongerAuthRequiredResult:
logging.exception("Failed to connect to LDAP endpoint: Active Directory requires LDAPS for NTLM binds")
except LDAPException as e:
logging.warn("Failed to connect to LDAP endpoint: %s" % e)
# LDAP connection could not be established, raise exception
raise LdapException("Connecting to LDAP endpoint of '%s' as '%s' failed" % (domain_controller, user))