bot/code_coverage_bot/utils.py (79 lines of code) (raw):

# -*- coding: utf-8 -*- # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import concurrent.futures import subprocess from zipfile import BadZipFile from zipfile import is_zipfile import requests import structlog import tenacity log = structlog.get_logger(__name__) def hide_secrets(text, secrets): if type(text) is bytes: encode_secret, xxx = lambda x: bytes(x, encoding="utf-8"), b"XXX" elif type(text) is str: encode_secret, xxx = lambda x: x, "XXX" else: return text for secret in secrets: if type(secret) is not str: continue text = text.replace(encode_secret(secret), xxx) return text def run_check(command, **kwargs): """ Run a command through subprocess and check for output """ assert isinstance(command, list) if len(command) == 0: raise Exception("Can't run an empty command.") _kwargs = dict( stdin=subprocess.DEVNULL, # no interactions stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) _kwargs.update(kwargs) log.info("Running command", command=" ".join(command), kwargs=_kwargs) with subprocess.Popen(command, **_kwargs) as proc: output, error = proc.communicate() if proc.returncode != 0: output = output and output.decode("utf-8") or "" error = error and error.decode("utf-8") or "" # Use error to send log to sentry log.error( f"Command failed with code: {proc.returncode}", exit=proc.returncode, command=" ".join(command), output=output.split("\n")[-1], error=error.split("\n")[-1], ) print(f"Output:\n{output}") print(f"Error:\n{error}") raise Exception(f"`{command[0]}` failed with code: {proc.returncode}.") return output class ThreadPoolExecutorResult(concurrent.futures.ThreadPoolExecutor): def __init__(self, *args, **kwargs): self.futures = [] super(ThreadPoolExecutorResult, self).__init__(*args, **kwargs) def submit(self, *args, **kwargs): future = super(ThreadPoolExecutorResult, self).submit(*args, **kwargs) self.futures.append(future) return future def __exit__(self, *args): try: for future in concurrent.futures.as_completed(self.futures): future.result() except Exception as e: for future in self.futures: future.cancel() raise e return super(ThreadPoolExecutorResult, self).__exit__(*args) def download_file(url: str, path: str) -> None: @tenacity.retry( reraise=True, wait=tenacity.wait_exponential(multiplier=1, min=16, max=64), stop=tenacity.stop_after_attempt(5), ) def perform_download() -> None: r = requests.get(url, stream=True) r.raise_for_status() with open(path, "wb") as f: for chunk in r.iter_content(chunk_size=1048576): f.write(chunk) if path.endswith(".zip") and not is_zipfile(path): raise BadZipFile("File is not a zip file") perform_download()