def validate_source_url()

in src/azure-cli/azure/cli/command_modules/storage/_validators.py [0:0]


def validate_source_url(cmd, namespace):  # pylint: disable=too-many-statements, too-many-locals
    from .util import create_short_lived_blob_sas, create_short_lived_blob_sas_v2, create_short_lived_file_sas, \
        create_short_lived_file_sas_v2
    from azure.cli.core.azclierror import InvalidArgumentValueError, RequiredArgumentMissingError, \
        MutuallyExclusiveArgumentError
    usage_string = \
        'Invalid usage: {}. Supply only one of the following argument sets to specify source:' \
        '\n\t   --source-uri [--source-sas]' \
        '\n\tOR --source-container --source-blob [--source-account-name & sas] [--source-snapshot]' \
        '\n\tOR --source-container --source-blob [--source-account-name & key] [--source-snapshot]' \
        '\n\tOR --source-share --source-path' \
        '\n\tOR --source-share --source-path [--source-account-name & sas]' \
        '\n\tOR --source-share --source-path [--source-account-name & key]'

    ns = vars(namespace)

    # source as blob
    container = ns.pop('source_container', None)
    blob = ns.pop('source_blob', None)
    snapshot = ns.pop('source_snapshot', None)

    # source as file
    share = ns.pop('source_share', None)
    path = ns.pop('source_path', None)
    file_snapshot = ns.pop('file_snapshot', None)

    # source credential clues
    source_account_name = ns.pop('source_account_name', None)
    source_account_key = ns.pop('source_account_key', None)
    source_sas = ns.pop('source_sas', None)
    token_credential = ns.get('token_credential')
    is_oauth = token_credential is not None

    # source in the form of an uri
    uri = ns.get('source_url', None)
    if uri:
        if any([container, blob, snapshot, share, path, file_snapshot, source_account_name,
                source_account_key]):
            raise InvalidArgumentValueError(usage_string.format(
                'Unused parameters are given in addition to the source URI'))
        if source_sas:
            source_sas = source_sas.lstrip('?')
            uri = '{}{}{}'.format(uri, '?', source_sas)
            namespace.copy_source = uri
        return

    # ensure either a file or blob source is specified
    valid_blob_source = container and blob and not share and not path and not file_snapshot
    valid_file_source = share and path and not container and not blob and not snapshot

    if not valid_blob_source and not valid_file_source:
        raise RequiredArgumentMissingError(usage_string.format('Neither a valid blob or file source is specified'))
    if valid_blob_source and valid_file_source:
        raise MutuallyExclusiveArgumentError(usage_string.format(
            'Ambiguous parameters, both blob and file sources are specified'))

    validate_client_parameters(cmd, namespace)  # must run first to resolve storage account

    if not source_account_name:
        if source_account_key:
            raise RequiredArgumentMissingError(usage_string.format(
                'Source account key is given but account name is not'))
        # assume that user intends to copy blob in the same account
        source_account_name = ns.get('account_name', None)

    # determine if the copy will happen in the same storage account
    same_account = False

    if not source_account_key and not source_sas and not is_oauth:
        if source_account_name == ns.get('account_name', None):
            same_account = True
            source_account_key = ns.get('account_key', None)
            source_sas = ns.get('sas_token', None)
        else:
            # the source account is different from destination account but the key is missing try to query one.
            try:
                source_account_key = _query_account_key(cmd.cli_ctx, source_account_name)
            except ValueError:
                raise RequiredArgumentMissingError('Source storage account {} not found.'.format(source_account_name))

    # if oauth, use user delegation key to generate sas
    source_user_delegation_key = None
    if is_oauth:
        client_kwargs = {'account_name': source_account_name,
                         'token_credential': token_credential}
        if valid_blob_source:
            client = cf_blob_service(cmd.cli_ctx, client_kwargs)

        from datetime import datetime, timedelta
        start = datetime.utcnow()
        expiry = datetime.utcnow() + timedelta(days=1)
        source_user_delegation_key = client.get_user_delegation_key(start, expiry)

    # Both source account name and either key or sas (or both) are now available
    if not source_sas:
        prefix = cmd.command_kwargs['resource_type'].value[0]
        # generate a sas token even in the same account when the source and destination are not the same kind.
        if valid_file_source and (ns.get('container_name', None) or not same_account):
            dir_name, file_name = os.path.split(path) if path else (None, '')
            if dir_name == '':
                dir_name = None
            if is_storagev2(prefix):
                source_sas = create_short_lived_file_sas_v2(cmd, source_account_name, source_account_key, share,
                                                            dir_name, file_name)
            else:
                source_sas = create_short_lived_file_sas(cmd, source_account_name, source_account_key, share,
                                                         dir_name, file_name)
        elif valid_blob_source and (ns.get('share_name', None) or not same_account):
            prefix = cmd.command_kwargs['resource_type'].value[0]
            # is_storagev2() is used to distinguish if the command is in track2 SDK
            # If yes, we will use get_login_credentials() as token credential
            if is_storagev2(prefix):
                source_sas = create_short_lived_blob_sas_v2(cmd, source_account_name, container, blob,
                                                            account_key=source_account_key,
                                                            user_delegation_key=source_user_delegation_key)
            else:
                source_sas = create_short_lived_blob_sas(cmd, source_account_name, source_account_key, container, blob)

    query_params = []
    if source_sas:
        query_params.append(source_sas.lstrip('?'))
    if snapshot:
        query_params.append('snapshot={}'.format(snapshot))
    if file_snapshot:
        query_params.append('sharesnapshot={}'.format(file_snapshot))

    uri = 'https://{0}.{1}.{6}/{2}/{3}{4}{5}'.format(
        source_account_name,
        'blob' if valid_blob_source else 'file',
        container if valid_blob_source else share,
        encode_for_url(blob if valid_blob_source else path),
        '?' if query_params else '',
        '&'.join(query_params),
        cmd.cli_ctx.cloud.suffixes.storage_endpoint)

    namespace.source_url = uri