ees_microsoft_outlook/microsoft_exchange_server_user.py (85 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """This module is used to get users accounts from microsoft exchange server. """ import warnings from urllib.parse import urlparse import requests.adapters from exchangelib import (IMPERSONATION, Account, Configuration, Credentials, FaultTolerance) from exchangelib.protocol import BaseProtocol, NoVerifyHTTPAdapter from ldap3 import SAFE_SYNC, Connection, Server global_dns_name = "" global_ssl_certificate_path = "" class RootCAAdapter(requests.adapters.HTTPAdapter): """This class is use to verify ssl certificate""" def cert_verify(self, conn, url, ssl_certificate_file, cert): """This method is used to verify certificate :param conn: The urllib3 connection object associated with the cert. :param url: The requested URL. :param ssl_certificate_file: Dictionary which contain ssl certificate :param cert: The SSL certificate to verify. """ ssl_certificate_file = { global_dns_name: global_ssl_certificate_path, }[urlparse(url).hostname] super().cert_verify(conn=conn, url=url, verify=ssl_certificate_file, cert=cert) class MicrosoftExchangeServerUser: """This class fetch users and user accounts""" def __init__(self, config): self.config = config def get_users(self): """Fetch users from Exchange Active Directory Returns: response: Fetched response from Exchange Active Directory """ warnings.filterwarnings("ignore") try: server = Server( self.config.get_value("microsoft_exchange.active_directory_server") ) conn = Connection( server, self.config.get_value("microsoft_exchange.username"), self.config.get_value("microsoft_exchange.password"), client_strategy=SAFE_SYNC, auto_bind=True, ) domain_name_list = self.config.get_value("microsoft_exchange.domain").split( "." ) ldap_domain_name_list = ["DC=" + domain for domain in domain_name_list] search_query = ",".join(map(str, ldap_domain_name_list)) status, _, response, _ = conn.search( search_query, "(&(objectCategory=person)(objectClass=user)(givenName=*))", attributes=["mail"], ) if status: return response else: raise Exception( "Error while searching users from Exchange Active Directory." ) except Exception as exception: raise Exception( f"Error while fetching users from Exchange Active Directory. Error: {exception}" ) def get_users_accounts(self, users): """Fetch user account from exchange server :param users: Fetch users from Exchange Active Directory Returns: users_accounts: List of all user accounts """ users_accounts = [] try: # Logic to establish secure connection when SSL is enabled into exchange server host name if self.config.get_value("microsoft_exchange.secure_connection"): global global_dns_name, global_ssl_certificate_path global_dns_name = self.config.get_value("microsoft_exchange.server") global_ssl_certificate_path = self.config.get_value( "microsoft_exchange.certificate_path" ) BaseProtocol.HTTP_ADAPTER_CLS = RootCAAdapter else: BaseProtocol.HTTP_ADAPTER_CLS = NoVerifyHTTPAdapter for user in users: if "searchResRef" not in user["type"]: credentials = Credentials( self.config.get_value("microsoft_exchange.username"), self.config.get_value("microsoft_exchange.password"), ) config = Configuration( server=self.config.get_value("microsoft_exchange.server"), credentials=credentials, retry_policy=FaultTolerance(max_wait=900), ) user_account = Account( primary_smtp_address=user["attributes"]["mail"], config=config, access_type=IMPERSONATION, ) users_accounts.append(user_account) return users_accounts except Exception as exception: raise Exception( f"Error while fetching users account from exchange server. Error: {exception}" )