azure-devops/azext_devops/dev/repos/_format.py (257 lines of code) (raw):

# -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from collections import OrderedDict import dateutil.parser import dateutil.tz _PR_TITLE_TRUNCATION_LENGTH = 50 _WORK_ITEM_TITLE_TRUNCATION_LENGTH = 70 def transform_repo_policies_table_output(result): table_output = [] for item in result: table_output.append(_transform_repo_policy_request_row(item)) return table_output def transform_repo_policy_table_output(result): table_output = [_transform_repo_policy_request_row(result)] return table_output def _transform_repo_policy_request_row(row): table_row = OrderedDict() table_row['ID'] = row['id'] table_row['Name'] = _get_policy_display_name(row) table_row['Is Blocking'] = row['isBlocking'] table_row['Is Enabled'] = row['isEnabled'] # this will break if policy is applied across repo but that is not possible via UI at least now table_row['Repository Id'] = row['settings']['scope'][0]['repositoryId'] if 'refName' in row['settings']['scope'][0]: table_row['Branch'] = row['settings']['scope'][0]['refName'] else: table_row['Branch'] = "All Branches" return table_row def _get_policy_display_name(row): if 'displayName' in row['settings']: return row['settings']['displayName'] return row['type']['displayName'] def transform_pull_requests_table_output(result): table_output = [] for item in result: table_output.append(_transform_pull_request_row(item)) return table_output def transform_pull_request_table_output(result): table_output = [_transform_pull_request_row(result)] return table_output def _transform_pull_request_row(row): table_row = OrderedDict() table_row['ID'] = row['pullRequestId'] table_row['Created'] = dateutil.parser.parse(row['creationDate']).astimezone(dateutil.tz.tzlocal()).date() table_row['Creator'] = row['createdBy']['uniqueName'] title = row['title'] if len(title) > _PR_TITLE_TRUNCATION_LENGTH: title = title[0:_PR_TITLE_TRUNCATION_LENGTH - 3] + '...' table_row['Title'] = title table_row['Status'] = row['status'].capitalize() table_row['IsDraft'] = str(row['isDraft']).capitalize() table_row['Repository'] = row['repository']['name'] return table_row def transform_reviewers_table_output(result): table_output = [] for item in sorted(result, key=_get_reviewer_table_key): table_output.append(_transform_reviewer_row(item)) return table_output def transform_reviewer_table_output(result): table_output = [_transform_reviewer_row(result)] return table_output def _get_reviewer_table_key(row): if row['isRequired']: key = '0' else: key = '1' key += row['displayName'].lower() return key _UNIQUE_NAME_GROUP_PREFIX = 'vstfs:///' def _transform_reviewer_row(row): table_row = OrderedDict() table_row['Name'] = row['displayName'] if row['uniqueName'][0:len(_UNIQUE_NAME_GROUP_PREFIX)] != _UNIQUE_NAME_GROUP_PREFIX: table_row['Email'] = row['uniqueName'] else: table_row['Email'] = ' ' table_row['ID'] = row['id'] table_row['Vote'] = _get_vote_from_vote_number(int(row['vote'])) if row['isRequired']: table_row['Required'] = 'True' else: table_row['Required'] = 'False' return table_row def transform_work_items_table_output(result): table_output = [] for item in result: table_output.append(_transform_work_items_row(item)) return table_output def transform_work_item_table_output(result): table_output = [_transform_work_items_row(result)] return table_output def _transform_work_items_row(row): table_row = OrderedDict() table_row['ID'] = row['id'] if 'fields' in row: if 'System.WorkItemType' in row['fields']: table_row['Type'] = row['fields']['System.WorkItemType'] else: table_row['Type'] = ' ' if 'System.AssignedTo' in row['fields']: table_row['Assigned To'] = row['fields']['System.AssignedTo'] else: table_row['Assigned To'] = ' ' if 'System.State' in row['fields']: table_row['State'] = row['fields']['System.State'] else: table_row['State'] = ' ' if 'System.Title' in row['fields']: title = row['fields']['System.Title'] if len(title) > _WORK_ITEM_TITLE_TRUNCATION_LENGTH: title = title[0:_WORK_ITEM_TITLE_TRUNCATION_LENGTH - 3] + '...' table_row['Title'] = title else: table_row['Title'] = ' ' else: table_row['Assigned To'] = ' ' table_row['State'] = ' ' table_row['Title'] = ' ' return table_row def _get_vote_from_vote_number(number): if number == 10: return 'Approved' if number == 5: return 'Approved with suggestions' if number == -5: return 'Waiting for author' if number == -10: return 'Rejected' return ' ' def transform_policies_table_output(result): from azext_devops.dev.common.identities import (ensure_display_names_in_cache, get_display_name_from_identity_id) from azext_devops.dev.common.services import get_first_vss_instance_uri table_output = [] reviewer_ids = [] for item in result: reviewer_id = get_required_reviewer_from_evaluation_row(item) if reviewer_id is not None: reviewer_ids.append(get_required_reviewer_from_evaluation_row(item)) organization = get_first_vss_instance_uri() ensure_display_names_in_cache(organization, reviewer_ids) for item in result: reviewer_id = get_required_reviewer_from_evaluation_row(item) if reviewer_id is not None: display_name = get_display_name_from_identity_id(organization, reviewer_id) else: display_name = None if display_name is not None: table_output.append(_transform_policy_row(item, display_name)) else: table_output.append(_transform_policy_row(item)) return sorted(table_output, key=_get_policy_table_key) def get_required_reviewer_from_evaluation_row(row): if 'requiredReviewerIds' in row['configuration']['settings'] and len( row['configuration']['settings']['requiredReviewerIds']) == 1: return row['configuration']['settings']['requiredReviewerIds'][0] return None def transform_policy_table_output(result): table_output = [_transform_policy_row(result)] return table_output def _get_policy_table_key(row): if row['Blocking'] == 'True': key = '0' else: key = '1' key += row['Policy'].lower() return key def _transform_policy_row(row, identity_display_name=None): table_row = OrderedDict() table_row['Evaluation ID'] = row['evaluationId'] table_row['Policy'] = _build_policy_name(row, identity_display_name) if row['configuration']['isBlocking']: table_row['Blocking'] = 'True' else: table_row['Blocking'] = 'False' table_row['Status'] = _convert_policy_status(row['status']) if row['context'] and 'isExpired' in row['context']: if row['context']['isExpired']: table_row['Expired'] = 'True' else: table_row['Expired'] = 'False' else: # Not Applicable table_row['Expired'] = ' ' if row['context'] and 'buildId' in row['context'] and row['context']['buildId'] is not None: table_row['Build ID'] = row['context']['buildId'] else: table_row['Build ID'] = ' ' return table_row def _build_policy_name(row, identity_display_name=None): policy = row['configuration']['type']['displayName'] if 'displayName' in row['configuration']['settings']\ and row['configuration']['settings']['displayName'] is not None: policy += ' (' + row['configuration']['settings']['displayName'] + ')' if 'minimumApproverCount' in row['configuration']['settings']\ and row['configuration']['settings']['minimumApproverCount'] is not None: policy += ' (' + str(row['configuration']['settings']['minimumApproverCount']) + ')' if identity_display_name is not None and 'requiredReviewerIds' in row['configuration']['settings']: if len(row['configuration']['settings']['requiredReviewerIds']) > 1: policy += ' (' + str(len(row['configuration']['settings']['requiredReviewerIds'])) + ')' elif len(row['configuration']['settings']['requiredReviewerIds']) == 1: policy += ' (' + identity_display_name + ')' return policy def _convert_policy_status(status): if status == 'queued': return ' ' return status.capitalize() def transform_refs_table_output(result): table_output = [] for item in sorted(result, key=_get_repo_key): table_output.append(_transform_ref_row(item)) return table_output def transform_ref_table_output(result): table_output = [_transform_ref_row(result)] return table_output def _transform_ref_row(row): from azext_devops.dev.common.git import get_ref_name_from_ref table_row = OrderedDict() if 'objectId' in row: table_row['Object ID'] = row['objectId'] if ('oldObjectId' in row) and ('newObjectId' in row): old_id = row['oldObjectId'] new_id = row['newObjectId'] if old_id == '0000000000000000000000000000000000000000': table_row['Object ID'] = new_id elif new_id == '0000000000000000000000000000000000000000': table_row['Object ID'] = old_id else: table_row['Old Object ID'] = old_id table_row['New Object ID'] = new_id table_row['Name'] = get_ref_name_from_ref(row['name']) table_row['Success'] = row['success'] if 'success' in row else None table_row['Update Status'] = row['updateStatus'] if 'updateStatus' in row else None return table_row def transform_repos_table_output(result): table_output = [] for item in sorted(result, key=_get_repo_key): table_output.append(_transform_repo_row(item)) return table_output def transform_repo_table_output(result): table_output = [_transform_repo_row(result)] return table_output def transform_repo_import_table_output(result): table_output = OrderedDict() table_output['Name'] = result['repository']['name'] table_output['Project'] = result['repository']['project']['name'] table_output['Import Status'] = result['status'] return table_output def _transform_repo_row(row): from azext_devops.dev.common.git import get_branch_name_from_ref table_row = OrderedDict() table_row['ID'] = row['id'] table_row['Name'] = row['name'] if row['defaultBranch']: table_row['Default Branch'] = get_branch_name_from_ref(row['defaultBranch']) else: table_row['Default Branch'] = ' ' table_row['Project'] = row['project']['name'] return table_row def _get_repo_key(repo_row): return repo_row['name']