azure-devops/azext_devops/dev/common/artifacttool.py (96 lines of code) (raw):

# -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import json import os from knack.log import get_logger from azext_devops.dev.common.services import _get_credentials from azext_devops.dev.common.const import ARTIFACTTOOL_PAT_ENVKEY logger = get_logger(__name__) class ArtifactToolInvoker: def __init__(self, tool_invoker, artifacttool_updater): self._tool_invoker = tool_invoker self._artifacttool_updater = artifacttool_updater def download_pipeline_artifact(self, organization, project, run_id, artifact_name, path): args = ["pipelineartifact", "download", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY, "--project", project, "--pipeline-id", run_id, "--artifact-name", artifact_name, "--path", path] return self.run_artifacttool(organization, args, "Downloading") def upload_pipeline_artifact(self, organization, project, run_id, artifact_name, path): args = ["pipelineartifact", "publish", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY, "--project", project, "--pipeline-id", run_id, "--artifact-name", artifact_name, "--path", path] return self.run_artifacttool(organization, args, "Uploading") def download_universal(self, organization, project, feed, package_name, package_version, path, file_filter): args = ["universal", "download", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY, "--feed", feed, "--package-name", package_name, "--package-version", package_version, "--path", path] if project: args.extend(["--project", project]) if file_filter: args.extend(["--filter", file_filter]) return self.run_artifacttool(organization, args, "Downloading") def publish_universal(self, organization, project, feed, package_name, package_version, description, path): args = ["universal", "publish", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY, "--feed", feed, "--package-name", package_name, "--package-version", package_version, "--path", path] if project: args.extend(["--project", project]) if description: args.extend(["--description", description]) return self.run_artifacttool(organization, args, "Publishing") def run_artifacttool(self, organization, args, initial_progress_message): # Download ArtifactTool if necessary, and return the path artifacttool_dir = self._artifacttool_updater.get_latest_artifacttool(organization) artifacttool_binary_path = os.path.join(artifacttool_dir, "artifacttool") # Populate the environment for the process with the PAT creds = _get_credentials(organization) new_env = os.environ.copy() new_env[ARTIFACTTOOL_PAT_ENVKEY] = str(creds.password) # Run ArtifactTool command_args = [artifacttool_binary_path] + args proc = self._tool_invoker.run(command_args, new_env, initial_progress_message, _process_stderr) if proc: output = proc.stdout.read().decode('utf-8') try: return json.loads(output) except ValueError: # JSONDecodeError but not available on Python 2.7 if output: logger.warning("Failed to parse the output of ArtifactTool as JSON. The output was:\n %s", output) return None def _process_stderr(line, update_progress_callback): try: json_line = json.loads(line) except BaseException as ex: # pylint: disable=broad-except json_line = None logger.warning("Failed to parse structured output from Universal Packages tooling (ArtifactTool)") logger.warning("Exception: %s", ex) logger.warning("Log line: %s", line) return _log_message(json_line) _process_event(json_line, update_progress_callback) # Interpret the structured log line from ArtifactTool and emit the message to Azure devops CLI logging def _log_message(json_line): if json_line is not None and '@m' in json_line: # Serilog doesn't emit @l for Information it seems log_level = json_line['@l'] if '@l' in json_line else "Information" message = json_line['@m'] if log_level in ["Critical", "Error"]: ex = json_line['@x'] if '@x' in json_line else None if ex: message = "{}\n{}".format(message, ex) logger.error(message) if log_level == "Warning": logger.warning(message) elif log_level == "Information": logger.info(message) else: logger.debug(message) # Inspect the structured log line for an event, and update the progress def _process_event(json_line, update_progress_callback): if json_line is not None and 'EventId' in json_line and 'Name' in json_line['EventId']: event_name = json_line['EventId']['Name'] if event_name == "ProcessingFiles": processed_files = json_line['ProcessedFiles'] total_files = json_line['TotalFiles'] percent = 100 * float(processed_files) / float(total_files) update_progress_callback("Pre-upload processing: {}/{} files" .format(processed_files, total_files), percent) if event_name == "Uploading": uploaded_bytes = json_line['UploadedBytes'] total_bytes = json_line['TotalBytes'] percent = 100 * float(uploaded_bytes) / float(total_bytes) update_progress_callback("Uploading: {}/{} bytes".format(uploaded_bytes, total_bytes), percent) if event_name == "Downloading": downloaded_bytes = json_line['DownloadedBytes'] total_bytes = json_line['TotalBytes'] percent = 100 * float(downloaded_bytes) / float(total_bytes) update_progress_callback("Downloading: {}/{} bytes".format(downloaded_bytes, total_bytes), percent)