def dataprotection_backup_instance_update_msi_permissions()

in src/dataprotection/azext_dataprotection/manual/custom.py [0:0]


def dataprotection_backup_instance_update_msi_permissions(cmd, resource_group_name, datasource_type, vault_name, operation,
                                                          permissions_scope, backup_instance=None, restore_request_object=None,
                                                          keyvault_id=None, snapshot_resource_group_id=None,
                                                          user_assigned_identity_arm_url=None,
                                                          target_storage_account_id=None, yes=False):
    if operation == 'Backup' and backup_instance is None:
        raise RequiredArgumentMissingError("--backup-instance needs to be given when --operation is given as Backup")

    if operation == "Restore" and restore_request_object is None:
        raise RequiredArgumentMissingError("--restore-request-object needs to be given when --operation is given as Restore")

    if datasource_type == 'AzureDatabaseForPostgreSQL':
        if not keyvault_id:
            raise RequiredArgumentMissingError("--keyvault-id needs to be given when --datasource-type is AzureDatabaseForPostgreSQL")
        if not is_valid_resource_id(keyvault_id):
            raise InvalidArgumentValueError("Please provide a valid keyvault ID")

    manifest = helper.load_manifest(datasource_type)

    warning_message = helper.get_help_text_on_grant_permissions_templatized(datasource_type, operation)
    if not yes and not prompt_y_n(warning_message):
        return None

    backup_vault = BackupVaultGet(cli_ctx=cmd.cli_ctx)(command_args={
        "resource_group": resource_group_name,
        "vault_name": vault_name
    })
    vault_principal_id = helper.get_vault_identity(backup_vault, user_assigned_identity_arm_url)

    role_assignments_arr = []

    if operation == "Backup":
        if helper.datasource_map[datasource_type] != backup_instance["properties"]["data_source_info"]["datasource_type"]:
            raise InvalidArgumentValueError("--backup-instance provided is not compatible with the --datasource-type.")

        if backup_instance['properties']['data_source_info']['resource_location'] != backup_vault['location']:
            raise InvalidArgumentValueError("Location of data source needs to be the same as backup vault.\nMake sure the datasource "
                                            "and vault are chosen properly")

        keyvault_client = None
        keyvault = None
        keyvault_subscription = None
        keyvault_name = None
        keyvault_rg = None
        if manifest['supportSecretStoreAuthentication']:
            cmd.command_kwargs['operation_group'] = 'vaults'
            keyvault_update = False

            from azure.cli.core.profiles import ResourceType
            from azure.cli.command_modules.keyvault._client_factory import Clients, data_plane_azure_keyvault_secret_client

            keyvault_params = parse_resource_id(keyvault_id)
            keyvault_subscription = keyvault_params['subscription']
            keyvault_name = keyvault_params['name']
            keyvault_rg = keyvault_params['resource_group']

            keyvault_client = getattr(get_mgmt_service_client(cmd.cli_ctx, ResourceType.MGMT_KEYVAULT, subscription_id=keyvault_subscription), Clients.vaults)

            keyvault = keyvault_client.get(resource_group_name=keyvault_rg, vault_name=keyvault_name)

            # Check if keyvault is not publicly accessible
            if keyvault.properties.public_network_access == 'Disabled':
                raise UnauthorizedError("Keyvault has public access disabled. Please enable public access, or grant access to your client IP")

            # Check if the secret URI provided in backup instance is a valid secret
            cmd.command_kwargs['vault_base_url'] = keyvault.properties.vault_uri
            data_secrets_client = data_plane_azure_keyvault_secret_client(cmd.cli_ctx, cmd.command_kwargs)
            secrets_list = data_secrets_client.list_properties_of_secrets()
            given_secret_uri = backup_instance['properties']['datasource_auth_credentials']['secret_store_resource']['uri']
            given_secret_id = helper.get_secret_params_from_uri(given_secret_uri)['secret_id']
            valid_secret = False
            for secret in secrets_list:
                if given_secret_id == secret.id:
                    valid_secret = True
                    break

            if not valid_secret:
                raise InvalidArgumentValueError("The secret URI provided in the --backup-instance is not associated with the "
                                                "--keyvault-id provided. Please input a valid combination of secret URI and "
                                                "--keyvault-id.")

            keyvault_permission_models = manifest['secretStorePermissions']
            if keyvault.properties.enable_rbac_authorization:
                role = keyvault_permission_models['rbacModel']['roleDefinitionName']

                keyvault_assignment_scope = helper.truncate_id_using_scope(keyvault_id, permissions_scope)

                role_assignment = list_role_assignments(cmd, assignee=vault_principal_id, role=role, scope=keyvault_id, include_inherited=True)
                if not role_assignment:
                    assignment = create_role_assignment(cmd, assignee=vault_principal_id, role=role, scope=keyvault_assignment_scope)
                    role_assignments_arr.append(helper.get_permission_object_from_role_object(assignment))

            else:
                from azure.cli.command_modules.keyvault.custom import set_policy
                vault_secret_permissions = (keyvault_permission_models['vaultAccessPolicyModel']
                                            ['accessPolicies']
                                            ['permissions']
                                            ['secrets'])

                secrets_array = []
                for policy in keyvault.properties.access_policies:
                    if policy.object_id == vault_principal_id:
                        secrets_array = policy.permissions.secrets
                        break

                permissions_set = True
                for permission in vault_secret_permissions:
                    if permission not in secrets_array:
                        permissions_set = False
                        secrets_array.append(permission)

                if not permissions_set:
                    keyvault_update = True
                    keyvault = set_policy(cmd, keyvault_client, keyvault_rg, keyvault_name, object_id=vault_principal_id, secret_permissions=secrets_array)
                    keyvault = keyvault.result()

            from azure.cli.command_modules.keyvault.custom import update_vault_setter

            if keyvault.properties.network_acls:
                if keyvault.properties.network_acls.bypass == 'None':
                    keyvault_update = True
                    keyvault.properties.network_acls.bypass = 'AzureServices'
                    update_vault_setter(cmd, keyvault_client, keyvault, resource_group_name=keyvault_rg, vault_name=keyvault_name)

            if keyvault_update:
                role_assignments_arr.append(helper.get_permission_object_from_keyvault(keyvault))

        if 'backupVaultPermissions' in manifest:
            for role_object in manifest['backupVaultPermissions']:
                role_assignments_arr = helper.check_and_assign_roles(cmd, permissions_scope=permissions_scope, role_object=role_object,
                                                                     backup_instance=backup_instance, principal_id=vault_principal_id,
                                                                     role_assignments_arr=role_assignments_arr)

        if 'dataSourcePermissions' in manifest:
            datasource_principal_id = helper.get_datasource_principal_id_from_object(cmd, datasource_type,
                                                                                     backup_instance=backup_instance)
            for role_object in manifest['dataSourcePermissions']:
                role_assignments_arr = helper.check_and_assign_roles(cmd, permissions_scope=permissions_scope, role_object=role_object,
                                                                     backup_instance=backup_instance, principal_id=datasource_principal_id,
                                                                     role_assignments_arr=role_assignments_arr)

        # Network line of sight access on server, if that is the datasource type
        if datasource_type == 'AzureDatabaseForPostgreSQL':
            server_params = parse_resource_id(backup_instance['properties']['data_source_info']['resource_id'])
            server_sub = server_params['subscription']
            server_name = server_params['name']
            server_rg = server_params['resource_group']

            from azure.mgmt.rdbms.postgresql import PostgreSQLManagementClient
            postgres_firewall_client = getattr(get_mgmt_service_client(cmd.cli_ctx, PostgreSQLManagementClient, subscription_id=server_sub), 'firewall_rules')

            firewall_rule_list = postgres_firewall_client.list_by_server(resource_group_name=server_rg, server_name=server_name)

            allow_access_to_azure_ips = False
            for rule in firewall_rule_list:
                if rule.start_ip_address == rule.end_ip_address and rule.start_ip_address == '0.0.0.0':
                    allow_access_to_azure_ips = True
                    break

            if not allow_access_to_azure_ips:
                firewall_rule_name = 'AllowAllWindowsAzureIps'
                parameters = {'name': firewall_rule_name, 'start_ip_address': '0.0.0.0', 'end_ip_address': '0.0.0.0'}

                rule = postgres_firewall_client.begin_create_or_update(server_rg, server_name, firewall_rule_name, parameters)
                role_assignments_arr.append(helper.get_permission_object_from_server_firewall_rule(rule.result()))
    elif operation == "Restore":
        if datasource_type not in ("AzureKubernetesService", "AzureDatabaseForMySQL",
                                   "AzureDatabaseForPostgreSQLFlexibleServer"):
            raise InvalidArgumentValueError("Set permissions for restore is currently not supported for given DataSourceType")

        for role_object in manifest['backupVaultRestorePermissions']:
            role_assignments_arr = helper.check_and_assign_roles(cmd, permissions_scope=permissions_scope, role_object=role_object,
                                                                 restore_request_object=restore_request_object, principal_id=vault_principal_id,
                                                                 role_assignments_arr=role_assignments_arr,
                                                                 target_storage_account_id=target_storage_account_id,
                                                                 snapshot_resource_group_id=snapshot_resource_group_id)

        if 'dataSourcePermissions' in manifest:
            datasource_principal_id = helper.get_datasource_principal_id_from_object(cmd, datasource_type,
                                                                                     restore_request_object=restore_request_object)
            for role_object in manifest['dataSourceRestorePermissions']:
                role_assignments_arr = helper.check_and_assign_roles(cmd, permissions_scope=permissions_scope, role_object=role_object,
                                                                     restore_request_object=restore_request_object, principal_id=datasource_principal_id,
                                                                     role_assignments_arr=role_assignments_arr,
                                                                     target_storage_account_id=target_storage_account_id,
                                                                     snapshot_resource_group_id=snapshot_resource_group_id)

    if not role_assignments_arr:
        logger.warning("The required permissions are already assigned!")
    else:
        # Wait for 60 seconds to let the role assignments propagate
        logger.warning("Waiting for 60 seconds for permissions to propagate")
        time.sleep(60)

    return role_assignments_arr