azure-devops/azext_devops/dev/common/identities.py (99 lines of code) (raw):
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from knack.util import CLIError
from .file_cache import get_cli_cache
from .uuid import is_uuid
from .services import get_connection_data, get_identity_client
def resolve_identity_as_id(identity_filter, organization):
"""Takes an identity name, email, alias, or id, and returns the id.
"""
if identity_filter is None or is_uuid(identity_filter):
return identity_filter
if identity_filter.lower() == ME:
return get_current_identity(organization).id
identity = resolve_identity(identity_filter, organization)
if identity is not None:
return identity.id
return None
def resolve_identity_as_identity_descriptor(identity_filter, organization):
"""Takes an identity name, email, alias, or id, and returns the id.
"""
if identity_filter is None:
return identity_filter
if identity_filter.lower() == ME:
return get_current_identity(organization).descriptor
identity = resolve_identity(identity_filter, organization)
if identity is not None:
return identity.descriptor
return None
def resolve_identity_as_display_name(identity_filter, organization):
"""Takes an identity name, email, alias, or id, and returns the display name.
"""
identity = resolve_identity(identity_filter, organization)
if identity is not None:
if identity_filter.lower() == ME:
return get_current_identity(organization).provider_display_name
return get_display_name_from_identity(identity)
return None
def resolve_identity(identity_filter, organization):
"""Takes an identity name, email, alias, or id, and returns the identity.
"""
if identity_filter is None:
return None
if identity_filter.lower() == ME:
return get_current_identity(organization)
identity_client = get_identity_client(organization)
if identity_filter.find(' ') > 0 or identity_filter.find('@') > 0:
identities = identity_client.read_identities(search_filter='General',
filter_value=identity_filter)
if identities is None or not identities:
identities = identity_client.read_identities(search_filter='DirectoryAlias',
filter_value=identity_filter)
else:
identities = identity_client.read_identities(search_filter='DirectoryAlias',
filter_value=identity_filter)
if identities is None or not identities:
identities = identity_client.read_identities(search_filter='General',
filter_value=identity_filter)
if not identities:
raise CLIError('Could not resolve identity: ' + identity_filter)
if len(identities) > 1:
# prefer users with same domain
identities_with_tenant = []
for identity in identities:
if 'Domain' in identity.properties and '$value' in identity.properties['Domain']:
current_user = get_current_identity(organization)
if 'Domain' in current_user.properties and '$value' in current_user.properties['Domain']\
and identity.properties['Domain']['$value'] ==\
current_user.properties['Domain']['$value']:
identities_with_tenant.append(identity)
if len(identities_with_tenant) == 1:
return identities_with_tenant[0]
raise CLIError('There are multiple identities found for "' + identity_filter + '" '
'Please provide a more specific identifier for this identity.')
return identities[0]
def get_current_identity(organization):
return get_connection_data(organization).authenticated_user
def get_identities(organization, identity_ids):
identity_client = get_identity_client(organization)
return identity_client.read_identities(identity_ids=identity_ids)
def ensure_display_names_in_cache(organization, identity_ids):
ids_to_look_up = []
for identity_id in identity_ids:
if not _display_name_cache[identity_id]:
ids_to_look_up.append(identity_id)
if ids_to_look_up:
resolved_identities = get_identities(organization, ','.join(ids_to_look_up))
for identity in resolved_identities:
_display_name_cache[identity.id] = get_display_name_from_identity(identity)
def get_display_name_from_identity_id(organization, identity_id):
if not _display_name_cache[identity_id]:
ensure_display_names_in_cache(organization, [identity_id])
if _display_name_cache[identity_id]:
return _display_name_cache[identity_id]
return None
def get_display_name_from_identity(identity):
if identity.custom_display_name is not None and identity.custom_display_name != '':
return identity.custom_display_name
return identity.provider_display_name
def get_account_from_identity(identity):
if 'Account' in identity.properties and '$value' in identity.properties['Account']:
return identity.properties['Account']['$value']
return identity.provider_display_name
def get_identity_descriptor_from_subject_descriptor(subject_descriptor, organization):
identity_client = get_identity_client(organization)
identities = identity_client.read_identities(subject_descriptors=subject_descriptor)
if identities:
return identities[0].descriptor
return subject_descriptor
ME = 'me'
_display_name_cache = get_cli_cache('identity_display_names', 3600 * 6)