azext_edge/edge/providers/orchestration/permissions.py (124 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from enum import Enum from typing import Iterable, Optional from uuid import uuid4 from azure.cli.core.azclierror import ValidationError from knack.log import get_logger from ...util.az_client import get_authz_client logger = get_logger(__name__) VALID_PERM_FORMS = frozenset( ["*", "*/write", "microsoft.authorization/roleassignments/write", "microsoft.authorization/*/write"] ) ROLE_DEF_FORMAT_STR = "/subscriptions/{subscription_id}/providers/Microsoft.Authorization/roleDefinitions/{role_id}" # TODO: one-off for time, make generic def verify_write_permission_against_rg(subscription_id: str, resource_group_name: str): for permission in get_principal_permissions_for_group( subscription_id=subscription_id, resource_group_name=resource_group_name ): action_result = False negate_action_result = False for action in permission.get("actions", []): if action.lower() in VALID_PERM_FORMS: action_result = True break for not_action in permission.get("notActions", []): if not_action.lower() in VALID_PERM_FORMS: negate_action_result = True break if action_result and not negate_action_result: return raise ValidationError( "This IoT Operations deployment config includes resource sync rules which require the logged-in principal\n" "to have permission to write role assignments (Microsoft.Authorization/roleAssignments/write) " "against the resource group.\n\n" "Run the command with --enable-rsync False to not include resource sync rules in the deployment.\n" ) def get_principal_permissions_for_group(subscription_id: str, resource_group_name: str) -> Iterable[dict]: authz_client = get_authz_client(subscription_id=subscription_id) return authz_client.permissions.list_for_resource_group(resource_group_name) class PermissionState(Enum): ActionAllowed = 1 ActionDenied = 2 ActionUndefined = 3 class PrincipalType(Enum): # There are more types but keeping this short with respect to what we use for now USER = "User" SERVICE_PRINCIPAL = "ServicePrincipal" class PermissionManager: def __init__(self, subscription_id: str): self.authz_client = get_authz_client( subscription_id=subscription_id, ) def apply_role_assignment( self, scope: str, principal_id: str, role_def_id: str, principal_type: Optional[str] = None ) -> Optional[dict]: role_assignments_iter = self.authz_client.role_assignments.list_for_scope( scope=scope, filter=f"principalId eq '{principal_id}'" ) for role_assignment in role_assignments_iter: if role_assignment["properties"]["roleDefinitionId"] == role_def_id: return props = { "properties": { "roleDefinitionId": role_def_id, "principalId": principal_id, } } if principal_type: props["properties"]["principalType"] = principal_type return self.authz_client.role_assignments.create( scope=scope, role_assignment_name=str(uuid4()), parameters=props, ) def can_apply_role_assignment( self, resource_group_name: str, resource_provider_namespace: str, parent_resource_path: str, resource_type: str, resource_name: str, ) -> bool: permissions = self._get_principal_permissions_for_resource( resource_group_name=resource_group_name, resource_provider_namespace=resource_provider_namespace, parent_resource_path=parent_resource_path, resource_type=resource_type, resource_name=resource_name, ) action_allowed = None for permission in permissions: action_result = self._calculate_action(permission=permission, valid_permissions=VALID_PERM_FORMS) if action_result == PermissionState.ActionAllowed and action_allowed is not False: action_allowed = True elif action_result == PermissionState.ActionDenied: action_allowed = False return bool(action_allowed) def _get_principal_permissions_for_resource( self, resource_group_name: str, resource_provider_namespace: str, parent_resource_path: str, resource_type: str, resource_name: str, ) -> Iterable: return self.authz_client.permissions.list_for_resource( resource_group_name=resource_group_name, resource_provider_namespace=resource_provider_namespace, parent_resource_path=parent_resource_path, resource_type=resource_type, resource_name=resource_name, ) def _calculate_action(self, permission: dict, valid_permissions: frozenset) -> PermissionState: action_result = False negate_action_result = False for action in permission.get("actions", []): if action.lower() in valid_permissions: action_result = True break for not_action in permission.get("notActions", []): if not_action.lower() in valid_permissions: negate_action_result = True break if action_result and not negate_action_result: return PermissionState.ActionAllowed if negate_action_result: return PermissionState.ActionDenied return PermissionState.ActionUndefined