in src/dataprotection/azext_dataprotection/manual/custom.py [0:0]
def dataprotection_backup_instance_update_msi_permissions(cmd, resource_group_name, datasource_type, vault_name, operation,
permissions_scope, backup_instance=None, restore_request_object=None,
keyvault_id=None, snapshot_resource_group_id=None,
user_assigned_identity_arm_url=None,
target_storage_account_id=None, yes=False):
if operation == 'Backup' and backup_instance is None:
raise RequiredArgumentMissingError("--backup-instance needs to be given when --operation is given as Backup")
if operation == "Restore" and restore_request_object is None:
raise RequiredArgumentMissingError("--restore-request-object needs to be given when --operation is given as Restore")
if datasource_type == 'AzureDatabaseForPostgreSQL':
if not keyvault_id:
raise RequiredArgumentMissingError("--keyvault-id needs to be given when --datasource-type is AzureDatabaseForPostgreSQL")
if not is_valid_resource_id(keyvault_id):
raise InvalidArgumentValueError("Please provide a valid keyvault ID")
manifest = helper.load_manifest(datasource_type)
warning_message = helper.get_help_text_on_grant_permissions_templatized(datasource_type, operation)
if not yes and not prompt_y_n(warning_message):
return None
backup_vault = BackupVaultGet(cli_ctx=cmd.cli_ctx)(command_args={
"resource_group": resource_group_name,
"vault_name": vault_name
})
vault_principal_id = helper.get_vault_identity(backup_vault, user_assigned_identity_arm_url)
role_assignments_arr = []
if operation == "Backup":
if helper.datasource_map[datasource_type] != backup_instance["properties"]["data_source_info"]["datasource_type"]:
raise InvalidArgumentValueError("--backup-instance provided is not compatible with the --datasource-type.")
if backup_instance['properties']['data_source_info']['resource_location'] != backup_vault['location']:
raise InvalidArgumentValueError("Location of data source needs to be the same as backup vault.\nMake sure the datasource "
"and vault are chosen properly")
keyvault_client = None
keyvault = None
keyvault_subscription = None
keyvault_name = None
keyvault_rg = None
if manifest['supportSecretStoreAuthentication']:
cmd.command_kwargs['operation_group'] = 'vaults'
keyvault_update = False
from azure.cli.core.profiles import ResourceType
from azure.cli.command_modules.keyvault._client_factory import Clients, data_plane_azure_keyvault_secret_client
keyvault_params = parse_resource_id(keyvault_id)
keyvault_subscription = keyvault_params['subscription']
keyvault_name = keyvault_params['name']
keyvault_rg = keyvault_params['resource_group']
keyvault_client = getattr(get_mgmt_service_client(cmd.cli_ctx, ResourceType.MGMT_KEYVAULT, subscription_id=keyvault_subscription), Clients.vaults)
keyvault = keyvault_client.get(resource_group_name=keyvault_rg, vault_name=keyvault_name)
# Check if keyvault is not publicly accessible
if keyvault.properties.public_network_access == 'Disabled':
raise UnauthorizedError("Keyvault has public access disabled. Please enable public access, or grant access to your client IP")
# Check if the secret URI provided in backup instance is a valid secret
cmd.command_kwargs['vault_base_url'] = keyvault.properties.vault_uri
data_secrets_client = data_plane_azure_keyvault_secret_client(cmd.cli_ctx, cmd.command_kwargs)
secrets_list = data_secrets_client.list_properties_of_secrets()
given_secret_uri = backup_instance['properties']['datasource_auth_credentials']['secret_store_resource']['uri']
given_secret_id = helper.get_secret_params_from_uri(given_secret_uri)['secret_id']
valid_secret = False
for secret in secrets_list:
if given_secret_id == secret.id:
valid_secret = True
break
if not valid_secret:
raise InvalidArgumentValueError("The secret URI provided in the --backup-instance is not associated with the "
"--keyvault-id provided. Please input a valid combination of secret URI and "
"--keyvault-id.")
keyvault_permission_models = manifest['secretStorePermissions']
if keyvault.properties.enable_rbac_authorization:
role = keyvault_permission_models['rbacModel']['roleDefinitionName']
keyvault_assignment_scope = helper.truncate_id_using_scope(keyvault_id, permissions_scope)
role_assignment = list_role_assignments(cmd, assignee=vault_principal_id, role=role, scope=keyvault_id, include_inherited=True)
if not role_assignment:
assignment = create_role_assignment(cmd, assignee=vault_principal_id, role=role, scope=keyvault_assignment_scope)
role_assignments_arr.append(helper.get_permission_object_from_role_object(assignment))
else:
from azure.cli.command_modules.keyvault.custom import set_policy
vault_secret_permissions = (keyvault_permission_models['vaultAccessPolicyModel']
['accessPolicies']
['permissions']
['secrets'])
secrets_array = []
for policy in keyvault.properties.access_policies:
if policy.object_id == vault_principal_id:
secrets_array = policy.permissions.secrets
break
permissions_set = True
for permission in vault_secret_permissions:
if permission not in secrets_array:
permissions_set = False
secrets_array.append(permission)
if not permissions_set:
keyvault_update = True
keyvault = set_policy(cmd, keyvault_client, keyvault_rg, keyvault_name, object_id=vault_principal_id, secret_permissions=secrets_array)
keyvault = keyvault.result()
from azure.cli.command_modules.keyvault.custom import update_vault_setter
if keyvault.properties.network_acls:
if keyvault.properties.network_acls.bypass == 'None':
keyvault_update = True
keyvault.properties.network_acls.bypass = 'AzureServices'
update_vault_setter(cmd, keyvault_client, keyvault, resource_group_name=keyvault_rg, vault_name=keyvault_name)
if keyvault_update:
role_assignments_arr.append(helper.get_permission_object_from_keyvault(keyvault))
if 'backupVaultPermissions' in manifest:
for role_object in manifest['backupVaultPermissions']:
role_assignments_arr = helper.check_and_assign_roles(cmd, permissions_scope=permissions_scope, role_object=role_object,
backup_instance=backup_instance, principal_id=vault_principal_id,
role_assignments_arr=role_assignments_arr)
if 'dataSourcePermissions' in manifest:
datasource_principal_id = helper.get_datasource_principal_id_from_object(cmd, datasource_type,
backup_instance=backup_instance)
for role_object in manifest['dataSourcePermissions']:
role_assignments_arr = helper.check_and_assign_roles(cmd, permissions_scope=permissions_scope, role_object=role_object,
backup_instance=backup_instance, principal_id=datasource_principal_id,
role_assignments_arr=role_assignments_arr)
# Network line of sight access on server, if that is the datasource type
if datasource_type == 'AzureDatabaseForPostgreSQL':
server_params = parse_resource_id(backup_instance['properties']['data_source_info']['resource_id'])
server_sub = server_params['subscription']
server_name = server_params['name']
server_rg = server_params['resource_group']
from azure.mgmt.rdbms.postgresql import PostgreSQLManagementClient
postgres_firewall_client = getattr(get_mgmt_service_client(cmd.cli_ctx, PostgreSQLManagementClient, subscription_id=server_sub), 'firewall_rules')
firewall_rule_list = postgres_firewall_client.list_by_server(resource_group_name=server_rg, server_name=server_name)
allow_access_to_azure_ips = False
for rule in firewall_rule_list:
if rule.start_ip_address == rule.end_ip_address and rule.start_ip_address == '0.0.0.0':
allow_access_to_azure_ips = True
break
if not allow_access_to_azure_ips:
firewall_rule_name = 'AllowAllWindowsAzureIps'
parameters = {'name': firewall_rule_name, 'start_ip_address': '0.0.0.0', 'end_ip_address': '0.0.0.0'}
rule = postgres_firewall_client.begin_create_or_update(server_rg, server_name, firewall_rule_name, parameters)
role_assignments_arr.append(helper.get_permission_object_from_server_firewall_rule(rule.result()))
elif operation == "Restore":
if datasource_type not in ("AzureKubernetesService", "AzureDatabaseForMySQL",
"AzureDatabaseForPostgreSQLFlexibleServer"):
raise InvalidArgumentValueError("Set permissions for restore is currently not supported for given DataSourceType")
for role_object in manifest['backupVaultRestorePermissions']:
role_assignments_arr = helper.check_and_assign_roles(cmd, permissions_scope=permissions_scope, role_object=role_object,
restore_request_object=restore_request_object, principal_id=vault_principal_id,
role_assignments_arr=role_assignments_arr,
target_storage_account_id=target_storage_account_id,
snapshot_resource_group_id=snapshot_resource_group_id)
if 'dataSourcePermissions' in manifest:
datasource_principal_id = helper.get_datasource_principal_id_from_object(cmd, datasource_type,
restore_request_object=restore_request_object)
for role_object in manifest['dataSourceRestorePermissions']:
role_assignments_arr = helper.check_and_assign_roles(cmd, permissions_scope=permissions_scope, role_object=role_object,
restore_request_object=restore_request_object, principal_id=datasource_principal_id,
role_assignments_arr=role_assignments_arr,
target_storage_account_id=target_storage_account_id,
snapshot_resource_group_id=snapshot_resource_group_id)
if not role_assignments_arr:
logger.warning("The required permissions are already assigned!")
else:
# Wait for 60 seconds to let the role assignments propagate
logger.warning("Waiting for 60 seconds for permissions to propagate")
time.sleep(60)
return role_assignments_arr