azurelinuxagent/ga/policy/policy_engine.py (144 lines of code) (raw):

# Copyright 2018 Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Requires Python 2.4+ and Openssl 1.0+ # import json import re import os from azurelinuxagent.common.future import ustr from azurelinuxagent.common import logger from azurelinuxagent.common.event import WALAEventOperation, add_event from azurelinuxagent.common import conf from azurelinuxagent.common.exception import AgentError from azurelinuxagent.common.protocol.extensions_goal_state_from_vm_settings import _CaseFoldedDict from azurelinuxagent.common.utils.flexible_version import FlexibleVersion # Default policy values to be used when customer does not specify these attributes in the policy file. _DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY = False _DEFAULT_SIGNATURE_REQUIRED = False _DEFAULT_EXTENSIONS = {} # Agent supports up to this version of the policy file ("policyVersion" in schema). # Increment this number when any new attributes are added to the policy schema. _MAX_SUPPORTED_POLICY_VERSION = "0.1.0" class PolicyError(AgentError): """ Error raised during agent policy enforcement. """ class InvalidPolicyError(AgentError): """ Error raised if user-provided policy is invalid. """ def __init__(self, msg, inner=None): msg = "Customer-provided policy file ('{0}') is invalid, please correct the following error: {1}".format(conf.get_policy_file_path(), msg) super(InvalidPolicyError, self).__init__(msg, inner) class _PolicyEngine(object): """ Implements base policy engine API. """ def __init__(self): # Set defaults for policy self._policy_enforcement_enabled = self.__get_policy_enforcement_enabled() if not self.policy_enforcement_enabled: return self._policy = self._parse_policy(self.__read_policy()) @staticmethod def _log_policy_event(msg, is_success=True, op=WALAEventOperation.Policy, send_event=True): """ Log information to console and telemetry. """ if is_success: logger.info(msg) else: logger.error(msg) if send_event: add_event(op=op, message=msg, is_success=is_success, log_event=False) @staticmethod def __get_policy_enforcement_enabled(): """ Policy will be enabled if (1) policy file exists at the expected location and (2) the conf flag "Debug.EnableExtensionPolicy" is true. """ return conf.get_extension_policy_enabled() and os.path.exists(conf.get_policy_file_path()) @property def policy_enforcement_enabled(self): return self._policy_enforcement_enabled @staticmethod def __read_policy(): """ Read customer-provided policy JSON file, load and return as a dict. Policy file is expected to be at conf.get_policy_file_path(). Note that this method should only be called after verifying that the file exists (currently done in __init__). Raise InvalidPolicyError if JSON is invalid, or any exceptions are thrown while reading the file. """ with open(conf.get_policy_file_path(), 'r') as f: try: contents = f.read() _PolicyEngine._log_policy_event( "Policy enforcement is enabled. Enforcing policy using policy file found at '{0}'. File contents:\n{1}" .format(conf.get_policy_file_path(), contents)) # json.loads will raise error if file contents are not a valid json (including empty file). custom_policy = json.loads(contents) except ValueError as ex: msg = "policy file does not conform to valid json syntax" raise InvalidPolicyError(msg=msg, inner=ex) except Exception as ex: msg = "unable to read policy file" raise InvalidPolicyError(msg=msg, inner=ex) return custom_policy @staticmethod def _parse_policy(policy): """ Parses the given policy document and returns an equivalent document that has been populated with default values and verified for correctness, i.e. that conforms the following schema: { "policyVersion": "0.1.0", "extensionPolicies": { "allowListedExtensionsOnly": <true, false>, [Optional; default: false] "signatureRequired": <true, false>, [Optional; default: false] "extensions": { [Optional; default: {} (empty)] "<extension_name>": { "signatureRequired": <true, false> [Optional; no default] "runtimePolicy": <extension-specific policy> [Optional; no default] } }, } } Raises InvalidPolicyError if the policy document is invalid. """ if not isinstance(policy, dict): raise InvalidPolicyError("expected an object describing a Policy; got {0}.".format(type(policy).__name__)) _PolicyEngine._check_attributes(policy, object_name="policy", valid_attributes=["policyVersion", "extensionPolicies"]) return { "policyVersion": _PolicyEngine._parse_policy_version(policy), "extensionPolicies": _PolicyEngine._parse_extension_policies(policy) } @staticmethod def _parse_policy_version(policy): """ Validate and return "policyVersion" attribute. If not a string in the format "major[.minor[.patch]]", raise InvalidPolicyError. If policy_version is greater than maximum supported version, raise InvalidPolicyError. """ version = _PolicyEngine._get_string(policy, attribute="policyVersion") if not re.match(r"^\d+(\.\d+(\.\d+)?)?$", version): raise InvalidPolicyError("invalid value for attribute 'policyVersion': '{0}'; it should be in format 'major[.minor[.patch]]' (e.g., '1', '1.0', '1.0.0')".format(version)) if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < FlexibleVersion(version): raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'.".format(version, _MAX_SUPPORTED_POLICY_VERSION)) return version @staticmethod def _parse_extension_policies(policy): """ Parses the "extensionPolicies" attribute of the policy document. See _parse_policy() for schema. """ extension_policies = _PolicyEngine._get_dictionary(policy, attribute="extensionPolicies", optional=True, default={}) _PolicyEngine._check_attributes(extension_policies, object_name="extensionPolicies", valid_attributes=["allowListedExtensionsOnly", "signatureRequired", "extensions"]) return { "allowListedExtensionsOnly": _PolicyEngine._get_boolean(extension_policies, attribute="allowListedExtensionsOnly", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY), "signatureRequired": _PolicyEngine._get_boolean(extension_policies, attribute="signatureRequired", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_SIGNATURE_REQUIRED), "extensions": _PolicyEngine._parse_extensions( _PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_EXTENSIONS) ) } @staticmethod def _parse_extensions(extensions): """ Parses the "extensions" attribute. See _parse_policy() for schema. The return value is a case-folded dict. CRP allows extensions to be any case, so we allow for case-insensitive lookup of individual extension policies. """ parsed = _CaseFoldedDict.from_dict({}) for extension, extension_policy in extensions.items(): if not isinstance(extension_policy, dict): raise InvalidPolicyError("invalid type {0} for attribute 'extensionPolicies.extensions.{1}'; must be 'object'".format(type(extension_policy).__name__, extension)) parsed[extension] = _PolicyEngine._parse_extension(extension_policy) return parsed @staticmethod def _parse_extension(extension): """ Parses an individual extension. See _parse_policy() for schema. """ extension_attribute_name = "extensionPolicies.extensions.{0}".format(extension) _PolicyEngine._check_attributes(extension, object_name=extension_attribute_name, valid_attributes=["signatureRequired", "runtimePolicy"]) return_value = {} signature_required = _PolicyEngine._get_boolean(extension, attribute="signatureRequired", name_prefix=extension_attribute_name, optional=True, default=None) if signature_required is not None: return_value["signatureRequired"] = signature_required # The runtimePolicy is an arbitrary object. runtime_policy = extension.get("runtimePolicy") if runtime_policy is not None: return_value["runtimePolicy"] = runtime_policy return return_value @staticmethod def _check_attributes(object_, object_name, valid_attributes): """ Check that the given object, which should be a dictionary, has only the specified attributes. If any other attributes are present, raise InvalidPolicyError. The object_name is used in the error message. """ for k in object_.keys(): if k not in valid_attributes: raise InvalidPolicyError("unrecognized attribute '{0}' in {1}".format(k, object_name)) @staticmethod def _get_dictionary(object_, attribute, name_prefix="", optional=False, default=None): """ Returns object[attribute] if it exists, verifying that it is a dictionary. If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError. If object_[attribute] is not a dictionary, raises InvalidPolicyError. The name_prefix indicates the path of the attribute within the policy document and is used in the error message. """ return _PolicyEngine._get_value(object_, attribute, name_prefix, dict, "object", optional=optional, default=default) @staticmethod def _get_string(object_, attribute, name_prefix="", optional=False, default=None): """ Returns object[attribute] if it exists, verifying that it is a string, else returns default. If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError. If object_[attribute] is not a string, raises InvalidPolicyError. The name_prefix indicates the path of the attribute within the policy document and is used in the error message. """ return _PolicyEngine._get_value(object_, attribute, name_prefix, ustr, "string", optional=optional, default=default) @staticmethod def _get_boolean(object_, attribute, name_prefix="", optional=False, default=None): """ Returns object[attribute] if it exists, verifying that it is a boolean, else returns default. If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError. If object_[attribute] is not a boolean, raises InvalidPolicyError. The name_prefix indicates the path of the attribute within the policy document and is used in the error message. """ return _PolicyEngine._get_value(object_, attribute, name_prefix, bool, "boolean", optional=optional, default=default) @staticmethod def _get_value(object_, attribute, name_prefix, type_, type_name, optional, default): """ Returns object[attribute] if it exists, verifying that it is of the given type_, else returns default. If object_[attribute] does not exist and 'optional' is True, returns 'default'; if 'optional' is False raises InvalidPolicyError. If the type of object_[attribute] is not 'type_', raises InvalidPolicyError. The name_prefix indicates the path of the attribute within the policy document, the type_name indicates a user-friendly name for type_; both are used in the error message. """ if default is not None and not optional: raise ValueError("default value should only be provided for optional attributes") value = object_.get(attribute) if value is None: if not optional: raise InvalidPolicyError("missing required attribute '{0}{1}'".format(name_prefix, attribute)) return default if not isinstance(value, type_): raise InvalidPolicyError("invalid type {0} for attribute '{1}{2}'; must be '{3}'".format(type(value).__name__, name_prefix, attribute, type_name)) return value class ExtensionPolicyEngine(_PolicyEngine): def should_allow_extension(self, extension_name): """ Return whether we should allow extension download based on policy. If policy feature not enabled, return True. If allowListedExtensionsOnly=true, return true only if extension present in "extensions" allowlist. If allowListedExtensions=false, return true always. """ if not self.policy_enforcement_enabled: return True allow_listed_extension_only = self._policy.get("extensionPolicies").get("allowListedExtensionsOnly") extension_allowlist = self._policy.get("extensionPolicies").get("extensions") should_allow = not allow_listed_extension_only or extension_allowlist.get(extension_name) is not None return should_allow def should_enforce_signature_validation(self, extension_name): """ Return whether we should enforce signature based on policy. If policy feature not enabled, return False. Individual policy takes precedence over global. """ if not self.policy_enforcement_enabled: return False global_signature_required = self._policy.get("extensionPolicies").get("signatureRequired") individual_policy = self._policy.get("extensionPolicies").get("extensions").get(extension_name) individual_signature_required = individual_policy.get("signatureRequired") if individual_policy is not None else None return individual_signature_required if individual_signature_required is not None else global_signature_required