azure-devops/azext_devops/dev/common/services.py (327 lines of code) (raw):

# -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import datetime import os from collections import OrderedDict from msrest.authentication import BasicAuthentication from azure.cli.core._profile import Profile from knack.log import get_logger from knack.util import CLIError from azext_devops.devops_sdk.connection import Connection from azext_devops.version import VERSION from .arguments import should_detect from .const import (DEFAULTS_SECTION, DEVOPS_ORGANIZATION_DEFAULT, DEVOPS_TEAM_PROJECT_DEFAULT, PAT_ENV_VARIABLE_NAME, ORG_PRESENT_IN_COMMAND, PROJECT_PRESENT_IN_COMMAND, REPO_PRESENT_IN_COMMAND, ORG_PICKED_FROM_GIT, PROJECT_PICKED_FROM_GIT, REPO_PICKED_FROM_GIT, ORG_PICKED_FROM_CONFIG, ORG_IGNORED_FROM_CONFIG, PROJECT_PICKED_FROM_CONFIG, PROJECT_IGNORED_FROM_CONFIG) from ._credentials import get_credential from .git import get_remote_url from .vsts_git_url_info import VstsGitUrlInfo from .uri import uri_parse_instance_from_git_uri, is_valid_url from .uuid import is_uuid from .telemetry import vsts_tracking_data, init_telemetry logger = get_logger(__name__) def get_connection(organization): organization = organization.lower() if organization not in _connection: credentials = _get_credentials(organization) try: from .telemetry import try_send_telemetry_data _connection[organization] = _get_connection(organization, credentials) try_send_telemetry_data(organization) except Exception as ex: logger.debug(ex, exc_info=True) raise CLIError(ex) return _connection[organization] def _get_credentials(organization): pat_token_present = False if PAT_ENV_VARIABLE_NAME in os.environ or get_credential(organization) is not None: logger.debug("PAT is present which can be used against this instance") pat_token_present = True try: token_from_az_login = get_token_from_az_logins(organization, pat_token_present) if token_from_az_login: credentials = BasicAuthentication('', token_from_az_login) return credentials except BaseException as ex: # pylint: disable=broad-except logger.debug("az login is not present") logger.debug(ex, exc_info=True) if PAT_ENV_VARIABLE_NAME in os.environ: pat = os.environ[PAT_ENV_VARIABLE_NAME] logger.info("received PAT from environment variable") else: pat = get_credential(organization) if pat is not None: logger.info("Creating connection with personal access token.") credentials = BasicAuthentication('', pat) # credentials can be incorrect but they are present then it means user has already done az devops login to set # so let the user get a 401 return credentials raise get_authentication_error('Before you can run Azure DevOps commands, you need to run the login command' '(az login if using AAD/MSA identity else az devops login if using PAT token) to ' 'setup credentials.') def validate_token_for_instance(organization, credentials): logger.debug("instance recieved in validate_token_for_instance %s", organization) organization = uri_parse_instance_from_git_uri(organization) logger.debug("instance processed in validate_token_for_instance %s", organization) connection = _get_connection(organization, credentials) core_client = connection.get_client(VSTS_MODULE + 'v5_0.core.core_client.CoreClient') try: core_client.get_projects(state_filter='all', top=1, skip=0) return True except BaseException as ex2: # pylint: disable=broad-except logger.debug(ex2, exc_info=True) logger.debug("Failed to connect using provided credentials") return False def get_default_subscription_info(): """ Returns the Id, name, tenantID and environmentName of the default subscription None if no default is set or no subscription is found """ profile = Profile() dummy_user = profile.get_current_account_user() # noqa: F841 subscriptions = profile.load_cached_subscriptions(False) for subscription in subscriptions: if subscription['isDefault']: return subscription['id'], subscription['name'], subscription['tenantId'], subscription['environmentName'] logger.debug('Your account does not have a default Azure subscription. Please run \'az login\' to setup account.') return None, None, None, None def get_token_from_az_logins(organization, pat_token_present): profile = Profile() dummy_user = profile.get_current_account_user() # noqa: F841 subscriptions = profile.load_cached_subscriptions(False) tenantsDict = OrderedDict() # first loop to make sure the first identity we try with is coming from selected subscription for subscription in subscriptions: if subscription['isDefault']: tenantsDict[(subscription['tenantId'], subscription['user']['name'])] = '' for subscription in subscriptions: tenantsDict[(subscription['tenantId'], subscription['user']['name'])] = '' skipValidateToken = False if pat_token_present is False and len(tenantsDict) == 1: skipValidateToken = True try: for key, dummy_value in tenantsDict.items(): try: logger.debug('trying to get token (temp) for tenant %s and user %s ', key[0], key[1]) token = get_token_from_az_login(profile, key[0]) credentials = BasicAuthentication('', token) if skipValidateToken is True: return token if validate_token_for_instance(organization, credentials): return token logger.debug('invalid token obtained for tenant %s', key[0]) except BaseException as ex2: # pylint: disable=broad-except logger.debug(ex2) logger.debug('failed while trying to get token for tenant %s', key[0]) except BaseException as ex: # pylint: disable=broad-except logger.debug(ex) return '' def get_token_from_az_login(profile, tenant): try: raw = profile.get_raw_token( resource='499b84ac-1321-427f-aa17-267ca6975798', tenant=tenant) creds = raw[0] auth_token = creds[1] return auth_token except BaseException as ex: # pylint: disable=broad-except logger.debug('not able to get token from az login') logger.debug(ex, exc_info=True) return "" def _get_connection(organization, credentials): return Connection(get_base_url(organization), creds=credentials, user_agent='devOpsCli/{}'.format(VERSION)) def get_first_vss_instance_uri(): for key in _connection: return key def get_release_client(team_instance=None): connection = get_connection(team_instance) return connection.get_client(VSTS_MODULE + 'v5_0.release.release_client.ReleaseClient') def get_build_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.build.build_client.BuildClient') def get_new_pipeline_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_1.build.build_client.BuildClient') def get_new_pipeline_client_v60(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v6_0.pipelines.pipelines_client.PipelinesClient') def get_new_task_agent_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_1.task_agent.task_agent_client.TaskAgentClient') def get_new_cix_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_1.cix.cix_client.CixClient') def get_ci_client(organization=None): connection = get_connection(organization) return connection.get_client( VSTS_MODULE + 'v5_0.customer_intelligence.customer_intelligence_client.CustomerIntelligenceClient') def get_core_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.core.core_client.CoreClient') def get_core_client_v51(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_1.core.core_client.CoreClient') def get_wiki_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.wiki.wiki_client.WikiClient') def get_git_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.git.git_client.GitClient') def get_graph_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.graph.graph_client.GraphClient') def get_identity_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.identity.identity_client.IdentityClient') def get_service_endpoint_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.service_endpoint.service_endpoint_client.ServiceEndpointClient') def get_location_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.location.location_client.LocationClient') def get_member_entitlement_management_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.member_entitlement_management.' 'member_entitlement_management_client.MemberEntitlementManagementClient') def get_operations_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.operations.operations_client.OperationsClient') def get_policy_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.policy.policy_client.PolicyClient') def get_security_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.security.security_client.SecurityClient') def get_settings_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.settings.settings_client.SettingsClient') def get_task_agent_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.task_agent.task_agent_client.TaskAgentClient') def get_work_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.work.work_client.WorkClient') def get_work_item_tracking_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.work_item_tracking.' 'work_item_tracking_client.WorkItemTrackingClient') def get_extension_client(organization=None): connection = get_connection(organization) return connection.get_client(VSTS_MODULE + 'v5_0.extension_management.extension_management_client.ExtensionManagementClient') def get_base_url(organization): if organization is not None: return organization raise _team_organization_arg_error() def _team_organization_arg_error(): return CLIError('--organization must be specified. The value should be the URI of your Azure DevOps ' 'organization, for example: https://dev.azure.com/MyOrganization/ or your Azure DevOps Server ' 'organization. You can set a default value by running: az devops configure --defaults ' 'organization=https://dev.azure.com/MyOrganization/. For auto detection to work ' '(--detect true), you must be in a local Git directory that has a "remote" referencing a ' 'Azure DevOps or Azure DevOps Server repository.') def _raise_team_project_arg_error(): raise CLIError('--project must be specified. The value should be the ID or name of a team project. ' 'You can set a default value by running: az devops configure --defaults project=<ProjectName>.') def _raise_repo_requird_arg_error(): raise CLIError('--repository must be specified') def resolve_instance_project_and_repo( detect, organization, project=None, project_required=True, repo=None, repo_required=False): init_telemetry() vsts_tracking_data.properties[ORG_PRESENT_IN_COMMAND] = organization is not None vsts_tracking_data.properties[PROJECT_PRESENT_IN_COMMAND] = project is not None vsts_tracking_data.properties[REPO_PRESENT_IN_COMMAND] = repo is not None if organization is None: if should_detect(detect): git_info = get_vsts_info_from_current_remote_url() organization = git_info.uri vsts_tracking_data.properties[ORG_PICKED_FROM_GIT] = organization is not None if project is None: project = git_info.project vsts_tracking_data.properties[PROJECT_PICKED_FROM_GIT] = project is not None if repo is None: repo = git_info.repo vsts_tracking_data.properties[REPO_PICKED_FROM_GIT] = repo is not None if organization is None: organization = _resolve_instance_from_config(organization) vsts_tracking_data.properties[ORG_PICKED_FROM_CONFIG] = organization is not None else: orgFromConfig = _resolve_instance_from_config(organization) vsts_tracking_data.properties[ORG_IGNORED_FROM_CONFIG] = orgFromConfig is not None if project is None: project = _resolve_project_from_config(project, project_required) vsts_tracking_data.properties[PROJECT_PICKED_FROM_CONFIG] = organization is not None else: projectFromConfig = _resolve_project_from_config(project, False) vsts_tracking_data.properties[PROJECT_IGNORED_FROM_CONFIG] = projectFromConfig is not None if not is_valid_url(organization): raise _team_organization_arg_error() if project_required and project is None: _raise_team_project_arg_error() if repo_required and repo is None: _raise_repo_requird_arg_error() if not check_organization_in_azure(organization): logger.warning("The Azure DevOps Extension for the Azure CLI does not support Azure DevOps Server.") return organization, project, repo def resolve_instance_and_project(detect, organization, project=None, project_required=True): organization, project, _ = resolve_instance_project_and_repo( detect=detect, organization=organization, project=project, project_required=project_required) return organization, project def resolve_instance(detect, organization): organization, _ = resolve_instance_and_project( detect=detect, organization=organization, project_required=False) return organization def _resolve_instance_from_config(organization): from .config import azdevops_config if organization is None: if azdevops_config.has_option(DEFAULTS_SECTION, DEVOPS_ORGANIZATION_DEFAULT): organization = azdevops_config.get(DEFAULTS_SECTION, DEVOPS_ORGANIZATION_DEFAULT) if organization is None or organization == '': raise _team_organization_arg_error() return organization def _resolve_project_from_config(project, project_required=True): from .config import azdevops_config if project is None: if azdevops_config.has_option(DEFAULTS_SECTION, DEVOPS_TEAM_PROJECT_DEFAULT): project = azdevops_config.get(DEFAULTS_SECTION, DEVOPS_TEAM_PROJECT_DEFAULT) if project_required and (project is None or project == ''): _raise_team_project_arg_error() return project def get_vsts_info_from_current_remote_url(): start = datetime.datetime.now() info = VstsGitUrlInfo(get_remote_url(VstsGitUrlInfo.is_vsts_url_candidate)) end = datetime.datetime.now() duration = end - start logger.info("Detect: Url discovery took %s", str(duration)) return info def get_connection_data(organization): organization = organization.lower() if organization in _connection_data: return _connection_data[organization] location_client = get_location_client(organization) _connection_data[organization] = location_client.get_connection_data() return _connection_data[organization] def get_authentication_error(message): return CLIError(str(message) + " Please see https://aka.ms/azure-devops-cli-auth for more information.") def clear_connection_cache(): _connection.clear() def get_project_id_from_name(organization, project): if not is_uuid(project): core_client = get_core_client(organization) team_project = core_client.get_project(project_id=project) return team_project.id return project def check_organization_in_azure(organization): startsWith = organization.startswith("https://dev.azure.com/") endsWith = organization.rstrip("/").endswith(".visualstudio.com") return startsWith or endsWith _connection_data = {} _connection = OrderedDict() VSTS_MODULE = 'azext_devops.devops_sdk.'