ees_microsoft_outlook/configuration.py (135 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """Configuration module allows manipulations with application configuration. This module can be used to read and validate configuration file that defines the settings of the Microsoft Outlook connector. """ import yaml from cerberus import Validator from yaml.error import YAMLError from .constant import ( CONNECTOR_TYPE_MICROSOFT_EXCHANGE, CONNECTOR_TYPE_OFFICE365, RFC_3339_DATETIME_FORMAT, ) from .schema import schema class ConfigurationInvalidException(Exception): """Exception raised when configuration was invalid. Attributes: errors - errors found in the configuration message -- explanation of the error """ def __init__(self, errors): super().__init__(f"Provided configuration was invalid. Errors: {errors}.") self.errors = errors class ConfigurationParsingException(Exception): """Exception raised when configuration could not be parsed. Attributes: file_name - name of the file that could not be parsed """ def __init__(self, file_name, inner_exception): super().__init__("Failed to parse configuration file.") self.file_name = file_name self.inner_exception = inner_exception class Configuration: """Configuration class is responsible for parsing, validating and accessing configuration options from connector configuration file.""" def __init__(self, file_name): self.__configurations = {} self.file_name = file_name try: with open(file_name, encoding="utf-8") as stream: self.__configurations = yaml.safe_load(stream) except YAMLError as exception: raise ConfigurationParsingException(file_name, exception) self.__configurations = self.validate() if self.__configurations["start_time"] >= self.__configurations["end_time"]: raise ConfigurationInvalidException( f"The start_time: {self.__configurations['start_time']} " f"cannot be greater than or equal to the end_time: {self.__configurations['end_time']}" ) # Converting datetime object to string for date_config in ["start_time", "end_time"]: value = self.__configurations[date_config] self.__configurations[date_config] = self.__parse_date_config_value(value) def validate(self): """Validates each properties defined in the yaml configuration file""" if ( self.__configurations["connector_platform_type"] and isinstance(self.__configurations[ "connector_platform_type"], str) and CONNECTOR_TYPE_OFFICE365 in self.__configurations["connector_platform_type"] ): schema.update( { "microsoft_exchange.secure_connection": { "required": False, "type": "boolean", "default": True, }, "microsoft_exchange.certificate_path": { "required": False, "type": "string", "empty": True, }, "office365.client_id": { "required": True, "type": "string", "empty": False, }, "office365.tenant_id": { "required": True, "type": "string", "empty": False, }, "office365.client_secret": { "required": True, "type": "string", "empty": False, }, } ) elif ( self.__configurations["connector_platform_type"] and isinstance(self.__configurations[ "connector_platform_type"], str) and CONNECTOR_TYPE_MICROSOFT_EXCHANGE in self.__configurations["connector_platform_type"] ): if self.__configurations["microsoft_exchange.secure_connection"] is False: schema.update( { "microsoft_exchange.secure_connection": { "required": True, "type": "boolean", "default": True, }, "microsoft_exchange.certificate_path": { "required": False, "type": "string", "empty": True, }, } ) schema.update( { "microsoft_exchange.active_directory_server": { "required": True, "type": "string", "empty": False, }, "microsoft_exchange.server": { "required": True, "type": "string", "empty": False, }, "microsoft_exchange.username": { "required": True, "type": "string", "empty": False, }, "microsoft_exchange.password": { "required": True, "type": "string", "empty": False, }, "microsoft_exchange.domain": { "required": True, "type": "string", "regex": r"^(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$", "empty": False, }, } ) else: raise ConfigurationInvalidException( "Enter valid connector platform type. Allowed values are 'Microsoft Exchange' and 'Office365'" ) validator = Validator(schema) validator.validate(self.__configurations, schema) if validator.errors: raise ConfigurationInvalidException(validator.errors) return validator.document def get_value(self, key): """Returns a configuration value that matches the key argument""" return self.__configurations.get(key) @staticmethod def __parse_date_config_value(string): """Change string to Datetime format""" return string.strftime(RFC_3339_DATETIME_FORMAT)