in src/azure-cli/azure/cli/command_modules/storage/_validators.py [0:0]
def validate_source_url(cmd, namespace): # pylint: disable=too-many-statements, too-many-locals
from .util import create_short_lived_blob_sas, create_short_lived_blob_sas_v2, create_short_lived_file_sas, \
create_short_lived_file_sas_v2
from azure.cli.core.azclierror import InvalidArgumentValueError, RequiredArgumentMissingError, \
MutuallyExclusiveArgumentError
usage_string = \
'Invalid usage: {}. Supply only one of the following argument sets to specify source:' \
'\n\t --source-uri [--source-sas]' \
'\n\tOR --source-container --source-blob [--source-account-name & sas] [--source-snapshot]' \
'\n\tOR --source-container --source-blob [--source-account-name & key] [--source-snapshot]' \
'\n\tOR --source-share --source-path' \
'\n\tOR --source-share --source-path [--source-account-name & sas]' \
'\n\tOR --source-share --source-path [--source-account-name & key]'
ns = vars(namespace)
# source as blob
container = ns.pop('source_container', None)
blob = ns.pop('source_blob', None)
snapshot = ns.pop('source_snapshot', None)
# source as file
share = ns.pop('source_share', None)
path = ns.pop('source_path', None)
file_snapshot = ns.pop('file_snapshot', None)
# source credential clues
source_account_name = ns.pop('source_account_name', None)
source_account_key = ns.pop('source_account_key', None)
source_sas = ns.pop('source_sas', None)
token_credential = ns.get('token_credential')
is_oauth = token_credential is not None
# source in the form of an uri
uri = ns.get('source_url', None)
if uri:
if any([container, blob, snapshot, share, path, file_snapshot, source_account_name,
source_account_key]):
raise InvalidArgumentValueError(usage_string.format(
'Unused parameters are given in addition to the source URI'))
if source_sas:
source_sas = source_sas.lstrip('?')
uri = '{}{}{}'.format(uri, '?', source_sas)
namespace.copy_source = uri
return
# ensure either a file or blob source is specified
valid_blob_source = container and blob and not share and not path and not file_snapshot
valid_file_source = share and path and not container and not blob and not snapshot
if not valid_blob_source and not valid_file_source:
raise RequiredArgumentMissingError(usage_string.format('Neither a valid blob or file source is specified'))
if valid_blob_source and valid_file_source:
raise MutuallyExclusiveArgumentError(usage_string.format(
'Ambiguous parameters, both blob and file sources are specified'))
validate_client_parameters(cmd, namespace) # must run first to resolve storage account
if not source_account_name:
if source_account_key:
raise RequiredArgumentMissingError(usage_string.format(
'Source account key is given but account name is not'))
# assume that user intends to copy blob in the same account
source_account_name = ns.get('account_name', None)
# determine if the copy will happen in the same storage account
same_account = False
if not source_account_key and not source_sas and not is_oauth:
if source_account_name == ns.get('account_name', None):
same_account = True
source_account_key = ns.get('account_key', None)
source_sas = ns.get('sas_token', None)
else:
# the source account is different from destination account but the key is missing try to query one.
try:
source_account_key = _query_account_key(cmd.cli_ctx, source_account_name)
except ValueError:
raise RequiredArgumentMissingError('Source storage account {} not found.'.format(source_account_name))
# if oauth, use user delegation key to generate sas
source_user_delegation_key = None
if is_oauth:
client_kwargs = {'account_name': source_account_name,
'token_credential': token_credential}
if valid_blob_source:
client = cf_blob_service(cmd.cli_ctx, client_kwargs)
from datetime import datetime, timedelta
start = datetime.utcnow()
expiry = datetime.utcnow() + timedelta(days=1)
source_user_delegation_key = client.get_user_delegation_key(start, expiry)
# Both source account name and either key or sas (or both) are now available
if not source_sas:
prefix = cmd.command_kwargs['resource_type'].value[0]
# generate a sas token even in the same account when the source and destination are not the same kind.
if valid_file_source and (ns.get('container_name', None) or not same_account):
dir_name, file_name = os.path.split(path) if path else (None, '')
if dir_name == '':
dir_name = None
if is_storagev2(prefix):
source_sas = create_short_lived_file_sas_v2(cmd, source_account_name, source_account_key, share,
dir_name, file_name)
else:
source_sas = create_short_lived_file_sas(cmd, source_account_name, source_account_key, share,
dir_name, file_name)
elif valid_blob_source and (ns.get('share_name', None) or not same_account):
prefix = cmd.command_kwargs['resource_type'].value[0]
# is_storagev2() is used to distinguish if the command is in track2 SDK
# If yes, we will use get_login_credentials() as token credential
if is_storagev2(prefix):
source_sas = create_short_lived_blob_sas_v2(cmd, source_account_name, container, blob,
account_key=source_account_key,
user_delegation_key=source_user_delegation_key)
else:
source_sas = create_short_lived_blob_sas(cmd, source_account_name, source_account_key, container, blob)
query_params = []
if source_sas:
query_params.append(source_sas.lstrip('?'))
if snapshot:
query_params.append('snapshot={}'.format(snapshot))
if file_snapshot:
query_params.append('sharesnapshot={}'.format(file_snapshot))
uri = 'https://{0}.{1}.{6}/{2}/{3}{4}{5}'.format(
source_account_name,
'blob' if valid_blob_source else 'file',
container if valid_blob_source else share,
encode_for_url(blob if valid_blob_source else path),
'?' if query_params else '',
'&'.join(query_params),
cmd.cli_ctx.cloud.suffixes.storage_endpoint)
namespace.source_url = uri