in src/storage-preview/azext_storage_preview/operations/account.py [0:0]
def update_storage_account(cmd, instance, sku=None, tags=None, custom_domain=None, use_subdomain=None,
encryption_services=None, encryption_key_source=None, encryption_key_version=None,
encryption_key_name=None, encryption_key_vault=None, enable_files_aadkerb=None,
access_tier=None, https_only=None, enable_sftp=None, enable_local_user=None,
enable_files_aadds=None, assign_identity=False,
bypass=None, default_action=None, enable_large_file_share=None, enable_files_adds=None,
domain_name=None, net_bios_domain_name=None, forest_name=None, domain_guid=None,
domain_sid=None, azure_storage_sid=None, sam_account_name=None, account_type=None,
routing_choice=None, publish_microsoft_endpoints=None, publish_internet_endpoints=None,
allow_blob_public_access=None, min_tls_version=None, allow_shared_key_access=None,
identity_type=None, user_identity_id=None,
key_vault_user_identity_id=None, federated_identity_client_id=None,
sas_expiration_period=None, key_expiration_period_in_days=None,
allow_cross_tenant_replication=None, default_share_permission=None,
immutability_period_since_creation_in_days=None, immutability_policy_state=None,
allow_protected_append_writes=None, public_network_access=None, allowed_copy_scope=None,
enable_extended_groups=None):
StorageAccountUpdateParameters, Sku, CustomDomain, AccessTier, Identity, Encryption, NetworkRuleSet = \
cmd.get_models('StorageAccountUpdateParameters', 'Sku', 'CustomDomain', 'AccessTier', 'Identity', 'Encryption',
'NetworkRuleSet')
domain = instance.custom_domain
if custom_domain is not None:
domain = CustomDomain(name=custom_domain)
if use_subdomain is not None:
domain.use_sub_domain_name = use_subdomain == 'true'
encryption = instance.encryption
if not encryption and any((encryption_services, encryption_key_source, encryption_key_name,
encryption_key_vault, encryption_key_version is not None)):
encryption = Encryption()
if encryption_services:
encryption.services = encryption_services
if encryption_key_source:
encryption.key_source = encryption_key_source
if encryption.key_source and encryption.key_source == "Microsoft.Keyvault":
if encryption.key_vault_properties is None:
KeyVaultProperties = cmd.get_models('KeyVaultProperties')
encryption.key_vault_properties = KeyVaultProperties()
else:
if any([encryption_key_name, encryption_key_vault, encryption_key_version]):
raise ValueError(
'Specify `--encryption-key-source=Microsoft.Keyvault` to configure key vault properties.')
if encryption.key_vault_properties is not None:
encryption.key_vault_properties = None
if encryption_key_name:
encryption.key_vault_properties.key_name = encryption_key_name
if encryption_key_vault:
encryption.key_vault_properties.key_vault_uri = encryption_key_vault
if encryption_key_version is not None:
encryption.key_vault_properties.key_version = encryption_key_version
params = StorageAccountUpdateParameters(
sku=Sku(name=sku) if sku is not None else instance.sku,
tags=tags if tags is not None else instance.tags,
custom_domain=domain,
encryption=encryption,
access_tier=AccessTier(access_tier) if access_tier is not None else instance.access_tier,
enable_https_traffic_only=https_only if https_only is not None else instance.enable_https_traffic_only
)
if identity_type and 'UserAssigned' in identity_type and user_identity_id:
user_assigned_identities = {user_identity_id: {}}
if instance.identity.user_assigned_identities:
for item in instance.identity.user_assigned_identities:
if item != user_identity_id:
user_assigned_identities[item] = None
params.identity = Identity(type=identity_type, user_assigned_identities=user_assigned_identities)
elif identity_type:
params.identity = Identity(type=identity_type)
if key_vault_user_identity_id is not None or federated_identity_client_id is not None:
original_encryption_identity = params.encryption.encryption_identity if params.encryption else None
EncryptionIdentity = cmd.get_models('EncryptionIdentity')
if not original_encryption_identity:
original_encryption_identity = EncryptionIdentity()
params.encryption.encryption_identity = EncryptionIdentity(
encryption_user_assigned_identity=key_vault_user_identity_id if key_vault_user_identity_id else original_encryption_identity.encryption_user_assigned_identity,
encryption_federated_identity_client_id=federated_identity_client_id if federated_identity_client_id else original_encryption_identity.encryption_federated_identity_client_id
)
AzureFilesIdentityBasedAuthentication = cmd.get_models('AzureFilesIdentityBasedAuthentication')
if enable_files_aadds is not None:
if enable_files_aadds: # enable AADDS
origin_storage_account = get_storage_account_properties(cmd.cli_ctx, instance.id)
if origin_storage_account.azure_files_identity_based_authentication and \
origin_storage_account.azure_files_identity_based_authentication.directory_service_options == 'AD':
raise CLIError("The Storage account already enabled ActiveDirectoryDomainServicesForFile, "
"please disable it by running this cmdlets with \"--enable-files-adds false\" "
"before enable AzureActiveDirectoryDomainServicesForFile.")
params.azure_files_identity_based_authentication = AzureFilesIdentityBasedAuthentication(
directory_service_options='AADDS' if enable_files_aadds else 'None')
else: # Only disable AADDS and keep others unchanged
origin_storage_account = get_storage_account_properties(cmd.cli_ctx, instance.id)
if not origin_storage_account.azure_files_identity_based_authentication or \
origin_storage_account.azure_files_identity_based_authentication.directory_service_options\
== 'AADDS':
params.azure_files_identity_based_authentication = AzureFilesIdentityBasedAuthentication(
directory_service_options='None')
else:
params.azure_files_identity_based_authentication = \
origin_storage_account.azure_files_identity_based_authentication
if enable_files_aadkerb is not None:
if enable_files_aadkerb: # enable AADKERB
origin_storage_account = get_storage_account_properties(cmd.cli_ctx, instance.id)
if origin_storage_account.azure_files_identity_based_authentication and \
origin_storage_account.azure_files_identity_based_authentication.directory_service_options \
== 'AADDS':
raise CLIError("The Storage account already enabled AzureActiveDirectoryDomainServicesForFile, "
"please disable it by running this cmdlets with \"--enable-files-aadds false\" "
"before enable AzureActiveDirectoryKerberosForFile.")
if origin_storage_account.azure_files_identity_based_authentication and \
origin_storage_account.azure_files_identity_based_authentication.directory_service_options == 'AD':
raise CLIError("The Storage account already enabled ActiveDirectoryDomainServicesForFile, "
"please disable it by running this cmdlets with \"--enable-files-adds false\" "
"before enable AzureActiveDirectoryKerberosForFile.")
active_directory_properties = None
if domain_name or domain_guid:
ActiveDirectoryProperties = cmd.get_models('ActiveDirectoryProperties')
active_directory_properties = ActiveDirectoryProperties(domain_name=domain_name,
domain_guid=domain_guid)
params.azure_files_identity_based_authentication = AzureFilesIdentityBasedAuthentication(
directory_service_options='AADKERB',
active_directory_properties=active_directory_properties)
else: # disable AADKERB
# Only disable AADKERB and keep others unchanged
origin_storage_account = get_storage_account_properties(cmd.cli_ctx, instance.id)
if not origin_storage_account.azure_files_identity_based_authentication or \
origin_storage_account.azure_files_identity_based_authentication.directory_service_options == 'AADKERB':
params.azure_files_identity_based_authentication = AzureFilesIdentityBasedAuthentication(
directory_service_options='None')
else:
params.azure_files_identity_based_authentication = \
origin_storage_account.azure_files_identity_based_authentication
if enable_files_adds is not None:
ActiveDirectoryProperties = cmd.get_models('ActiveDirectoryProperties')
if enable_files_adds: # enable AD
if not (domain_name and net_bios_domain_name and forest_name and domain_guid and domain_sid and
azure_storage_sid):
raise CLIError("To enable ActiveDirectoryDomainServicesForFile, user must specify all of: "
"--domain-name, --net-bios-domain-name, --forest-name, --domain-guid, --domain-sid and "
"--azure_storage_sid arguments in Azure Active Directory Properties Argument group.")
origin_storage_account = get_storage_account_properties(cmd.cli_ctx, instance.id)
if origin_storage_account.azure_files_identity_based_authentication and \
origin_storage_account.azure_files_identity_based_authentication.directory_service_options \
== 'AADDS':
raise CLIError("The Storage account already enabled AzureActiveDirectoryDomainServicesForFile, "
"please disable it by running this cmdlets with \"--enable-files-aadds false\" "
"before enable ActiveDirectoryDomainServicesForFile.")
if origin_storage_account.azure_files_identity_based_authentication and \
origin_storage_account.azure_files_identity_based_authentication.directory_service_options == 'AADKERB':
raise CLIError("The Storage account already enabled AzureActiveDirectoryKerberosForFile, "
"please disable it by running this cmdlets with \"--enable-files-aadkerb false\" "
"before enable ActiveDirectoryDomainServicesForFile.")
active_directory_properties = ActiveDirectoryProperties(domain_name=domain_name,
net_bios_domain_name=net_bios_domain_name,
forest_name=forest_name, domain_guid=domain_guid,
domain_sid=domain_sid,
azure_storage_sid=azure_storage_sid,
sam_account_name=sam_account_name,
account_type=account_type)
# TODO: Enabling AD will automatically disable AADDS. Maybe we should throw error message
params.azure_files_identity_based_authentication = AzureFilesIdentityBasedAuthentication(
directory_service_options='AD',
active_directory_properties=active_directory_properties)
else: # disable AD
if domain_name or net_bios_domain_name or forest_name or domain_guid or domain_sid or azure_storage_sid:
raise CLIError("To disable ActiveDirectoryDomainServicesForFile, user can't specify any of: "
"--domain-name, --net-bios-domain-name, --forest-name, --domain-guid, --domain-sid and "
"--azure_storage_sid arguments in Azure Active Directory Properties Argument group.")
# Only disable AD and keep others unchanged
origin_storage_account = get_storage_account_properties(cmd.cli_ctx, instance.id)
if not origin_storage_account.azure_files_identity_based_authentication or \
origin_storage_account.azure_files_identity_based_authentication.directory_service_options == 'AD':
params.azure_files_identity_based_authentication = AzureFilesIdentityBasedAuthentication(
directory_service_options='None')
else:
params.azure_files_identity_based_authentication = \
origin_storage_account.azure_files_identity_based_authentication
if default_share_permission is not None:
if params.azure_files_identity_based_authentication is None:
params.azure_files_identity_based_authentication = AzureFilesIdentityBasedAuthentication(
directory_service_options='None')
params.azure_files_identity_based_authentication.default_share_permission = default_share_permission
if assign_identity:
params.identity = Identity(type='SystemAssigned')
if enable_large_file_share:
LargeFileSharesState = cmd.get_models('LargeFileSharesState')
params.large_file_shares_state = LargeFileSharesState("Enabled")
if NetworkRuleSet:
acl = instance.network_rule_set
if acl:
if bypass:
acl.bypass = bypass
if default_action:
acl.default_action = default_action
elif default_action:
acl = NetworkRuleSet(bypass=bypass, virtual_network_rules=None, ip_rules=None,
default_action=default_action)
elif bypass:
raise CLIError('incorrect usage: --default-action ACTION [--bypass SERVICE ...]')
params.network_rule_set = acl
if hasattr(params, 'routing_preference') and any([routing_choice, publish_microsoft_endpoints,
publish_internet_endpoints]):
if params.routing_preference is None:
RoutingPreference = cmd.get_models('RoutingPreference')
params.routing_preference = RoutingPreference()
if routing_choice is not None:
params.routing_preference.routing_choice = routing_choice
if publish_microsoft_endpoints is not None:
params.routing_preference.publish_microsoft_endpoints = publish_microsoft_endpoints
if publish_internet_endpoints is not None:
params.routing_preference.publish_internet_endpoints = publish_internet_endpoints
if allow_blob_public_access is not None:
params.allow_blob_public_access = allow_blob_public_access
if min_tls_version:
params.minimum_tls_version = min_tls_version
if allow_shared_key_access is not None:
params.allow_shared_key_access = allow_shared_key_access
if key_expiration_period_in_days is not None:
KeyPolicy = cmd.get_models('KeyPolicy')
params.key_policy = KeyPolicy(key_expiration_period_in_days=key_expiration_period_in_days)
if sas_expiration_period:
SasPolicy = cmd.get_models('SasPolicy')
params.sas_policy = SasPolicy(sas_expiration_period=sas_expiration_period)
if allow_cross_tenant_replication is not None:
params.allow_cross_tenant_replication = allow_cross_tenant_replication
if any([immutability_period_since_creation_in_days, immutability_policy_state, allow_protected_append_writes is not None]):
ImmutableStorageAccount = cmd.get_models('ImmutableStorageAccount')
AccountImmutabilityPolicyProperties = cmd.get_models('AccountImmutabilityPolicyProperties')
immutability_policy = None
immutability_policy = AccountImmutabilityPolicyProperties(
immutability_period_since_creation_in_days=immutability_period_since_creation_in_days,
state=immutability_policy_state,
allow_protected_append_writes=allow_protected_append_writes
)
params.immutable_storage_with_versioning = ImmutableStorageAccount(enabled=None,
immutability_policy=immutability_policy)
if public_network_access is not None:
params.public_network_access = public_network_access
if allowed_copy_scope is not None:
params.allowed_copy_scope = allowed_copy_scope
if enable_sftp is not None:
params.is_sftp_enabled = enable_sftp
if enable_local_user is not None:
params.is_local_user_enabled = enable_local_user
if enable_extended_groups is not None:
params.enable_extended_groups = enable_extended_groups
return params