azure-devops/azext_devops/dev/pipelines/_format.py (302 lines of code) (raw):
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from collections import OrderedDict
import dateutil
_VALUE_TRUNCATION_LENGTH = 80
def transform_builds_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_build_row(item))
return table_output
def transform_build_table_output(result):
table_output = [_transform_build_row(result)]
return table_output
def _transform_build_row(row):
from azext_devops.dev.common.git import REF_HEADS_PREFIX
table_row = OrderedDict()
table_row['ID'] = row['id']
table_row['Number'] = row['buildNumber']
table_row['Status'] = row['status']
if row['result']:
table_row['Result'] = row['result']
else:
table_row['Result'] = ' '
table_row['Definition ID'] = row['definition']['id']
table_row['Definition Name'] = row['definition']['name']
if row['sourceBranch']:
source_branch = row['sourceBranch']
if source_branch[0:len(REF_HEADS_PREFIX)] == REF_HEADS_PREFIX:
source_branch = source_branch[len(REF_HEADS_PREFIX):]
table_row['Source Branch'] = source_branch
else:
table_row['Source Branch'] = ' '
queued_time = dateutil.parser.parse(row['queueTime']).astimezone(dateutil.tz.tzlocal())
table_row['Queued Time'] = str(queued_time.date()) + ' ' + str(queued_time.time())
table_row['Reason'] = row['reason']
return table_row
def transform_build_tags_output(result):
table_output = []
for item in result:
table_output.append(_transform_build_tags_row(item))
return table_output
def _transform_build_tags_row(row):
table_row = OrderedDict()
table_row['Tags'] = row
return table_row
def transform_definitions_table_output(result):
table_output = []
include_draft_column = False
for item in result:
if item['quality'] == 'draft':
include_draft_column = True
break
for item in result:
table_output.append(_transform_definition_row(item, include_draft_column))
return table_output
def transform_definition_table_output(result):
table_output = [_transform_definition_row(result, result['quality'] == 'draft')]
return table_output
def _transform_definition_row(row, include_draft_column=False):
table_row = OrderedDict()
table_row['ID'] = row['id']
table_row['Name'] = row['name']
if include_draft_column:
if row['quality'] == 'draft':
table_row['Draft'] = True
else:
table_row['Draft'] = ' '
if row['queueStatus']:
table_row['Status'] = row['queueStatus']
else:
table_row['Status'] = ' '
if row['queue']:
table_row['Default Queue'] = row['queue']['name']
else:
table_row['Default Queue'] = ' '
return table_row
def _get_task_key(row):
return row['name'].lower()
def transform_releases_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_release_row(item))
return table_output
def transform_release_table_output(result):
table_output = [_transform_release_row(result)]
return table_output
def _transform_release_row(row):
table_row = OrderedDict()
table_row['ID'] = row['id']
table_row['Name'] = row['name']
table_row['Definition Name'] = row['releaseDefinition']['name']
table_row['Created By'] = row['createdBy']['displayName']
created_on = dateutil.parser.parse(row['createdOn']).astimezone(dateutil.tz.tzlocal())
table_row['Created On'] = str(created_on.date()) + ' ' + str(created_on.time())
table_row['Status'] = row['status']
table_row['Description'] = row['description']
return table_row
def transform_release_definitions_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_release_definition_row(item))
return table_output
def transform_release_definition_table_output(result):
table_output = [_transform_release_definition_row(result)]
return table_output
def _transform_release_definition_row(row):
table_row = OrderedDict()
table_row['ID'] = row['id']
table_row['Name'] = row['name']
table_row['CreatedBy'] = row['createdBy']['displayName']
table_row['Created On'] = row['createdOn']
return table_row
def transform_runs_artifact_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_runs_artifact_row(item))
return table_output
def _transform_runs_artifact_row(row):
table_row = OrderedDict()
table_row['ID'] = row['id']
table_row['Name'] = row['name']
table_row['Type'] = row['resource']['type']
return table_row
def transform_pipelines_table_output(result):
table_output = []
include_draft_column = False
for item in result:
if item['quality'] == 'draft':
include_draft_column = True
break
for item in result:
table_output.append(_transform_pipeline_row(item, include_draft_column))
return table_output
def transform_pipeline_table_output(result):
table_output = [_transform_pipeline_row(result, result['quality'] == 'draft')]
return table_output
def _transform_pipeline_row(row, include_draft_column=False):
table_row = OrderedDict()
table_row['ID'] = row['id']
path = row['path']
table_row['Path'] = path[:50] + '..' if len(path) > 50 else path
name = row['name']
table_row['Name'] = name[:50] + '..' if len(name) > 50 else name
if include_draft_column:
if row['quality'] == 'draft':
table_row['Draft'] = True
else:
table_row['Draft'] = ' '
if row['queueStatus']:
table_row['Status'] = row['queueStatus']
else:
table_row['Status'] = ' '
if row['queue']:
table_row['Default Queue'] = row['queue']['name']
else:
table_row['Default Queue'] = ' '
return table_row
def transform_pipeline_runs_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_pipeline_run_row(item))
return table_output
def transform_pipeline_or_run_table_output(result):
table_output = [_transform_pipeline_or_run_row(result)]
return table_output
def _transform_pipeline_or_run_row(row):
if row.get("buildNumber", None): # Hack to detect the json object is definition or run
return _transform_pipeline_run_row(row)
return _transform_pipeline_row(row)
def transform_pipeline_run_table_output(result):
table_output = [_transform_pipeline_run_row(result)]
return table_output
def _transform_pipeline_run_row(row):
from azext_devops.dev.common.git import REF_HEADS_PREFIX
table_row = OrderedDict()
table_row['Run ID'] = row['id']
table_row['Number'] = row['buildNumber']
table_row['Status'] = row['status']
if row['result']:
table_row['Result'] = row['result']
else:
table_row['Result'] = ' '
table_row['Pipeline ID'] = row['definition']['id']
table_row['Pipeline Name'] = row['definition']['name']
if row['sourceBranch']:
source_branch = row['sourceBranch']
if source_branch[0:len(REF_HEADS_PREFIX)] == REF_HEADS_PREFIX:
source_branch = source_branch[len(REF_HEADS_PREFIX):]
table_row['Source Branch'] = source_branch
else:
table_row['Source Branch'] = ' '
queued_time = dateutil.parser.parse(row['queueTime']).astimezone(dateutil.tz.tzlocal())
table_row['Queued Time'] = str(queued_time.date()) + ' ' + str(queued_time.time())
table_row['Reason'] = row['reason']
return table_row
def transform_pipelines_pools_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_pipeline_pool_row(item))
return table_output
def transform_pipelines_pool_table_output(result):
table_output = [_transform_pipeline_pool_row(result)]
return table_output
def _transform_pipeline_pool_row(row):
table_row = OrderedDict()
table_row['ID'] = row['id']
table_row['Name'] = row['name']
table_row['Is Hosted'] = row['isHosted']
table_row['Pool Type'] = row['poolType']
return table_row
def transform_pipelines_agents_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_pipeline_agent_row(item))
return table_output
def transform_pipelines_agent_table_output(result):
table_output = [_transform_pipeline_agent_row(result)]
return table_output
def _transform_pipeline_agent_row(row):
table_row = OrderedDict()
table_row['ID'] = row['id']
table_row['Name'] = row['name']
table_row['Is Enabled'] = row['enabled']
table_row['Status'] = row['status']
table_row['Version'] = row['version']
return table_row
def transform_pipelines_queues_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_pipeline_queue_row(item))
return table_output
def transform_pipelines_queue_table_output(result):
table_output = [_transform_pipeline_queue_row(result)]
return table_output
def _transform_pipeline_queue_row(row):
table_row = OrderedDict()
table_row['ID'] = row['id']
table_row['Name'] = row['name']
table_row['Pool IsHosted'] = row['pool']['isHosted']
table_row['Pool Type'] = row['pool']['poolType']
return table_row
def transform_pipelines_variable_groups_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_pipeline_variable_group_row(item))
return table_output
def transform_pipelines_variable_group_table_output(result):
table_output = [_transform_pipeline_variable_group_row(result)]
return table_output
def _transform_pipeline_variable_group_row(row):
table_row = OrderedDict()
table_row['ID'] = row['id']
table_row['Name'] = row['name']
table_row['Type'] = row['type']
table_row['Description'] = row['description']
if row.get('authorized', None) is not None:
table_row['Is Authorized'] = row['authorized']
table_row['Number of Variables'] = len(row['variables'])
return table_row
def transform_pipelines_variables_table_output(result):
table_output = []
for key, value in result.items():
table_output.append(_transform_pipeline_variable_row(key, value))
return table_output
def _transform_pipeline_variable_row(key, value):
table_row = OrderedDict()
table_row['Name'] = key
table_row['Allow Override'] = 'True' if value['allowOverride'] else 'False'
table_row['Is Secret'] = 'True' if value['isSecret'] else 'False'
val = value['value'] if value['value'] is not None else ''
if len(val) > _VALUE_TRUNCATION_LENGTH:
val = val[0:_VALUE_TRUNCATION_LENGTH] + '...'
table_row['Value'] = val
return table_row
def transform_pipelines_var_group_variables_table_output(result):
table_output = []
for key, value in result.items():
table_output.append(_transform_pipeline_var_group_variable_row(key, value))
return table_output
def _transform_pipeline_var_group_variable_row(key, value):
table_row = OrderedDict()
table_row['Name'] = key
table_row['Is Secret'] = 'True' if value['isSecret'] else 'False'
val = value['value'] if value['value'] is not None else ''
if len(val) > _VALUE_TRUNCATION_LENGTH:
val = val[0:_VALUE_TRUNCATION_LENGTH] + '...'
table_row['Value'] = val
return table_row
def transform_pipelines_folders_table_output(result):
table_output = []
for item in result:
table_output.append(_transform_pipeline_folder_row(item))
return table_output
def transform_pipelines_folder_table_output(result):
table_output = [_transform_pipeline_folder_row(result)]
return table_output
def _transform_pipeline_folder_row(row):
table_row = OrderedDict()
table_row['Path'] = row['path']
val = row['description'] if row['description'] is not None else ''
if len(val) > _VALUE_TRUNCATION_LENGTH:
val = val[0:_VALUE_TRUNCATION_LENGTH] + '...'
table_row['Description'] = val
return table_row