def download_url_to_file_with_progress()

in build/fbcode_builder/getdeps/fetcher.py [0:0]


def download_url_to_file_with_progress(url: str, file_name) -> None:
    print("Download with %s -> %s ..." % (url, file_name))

    class Progress(object):
        last_report = 0

        def write_update(self, total, amount):
            if total == -1:
                total = "(Unknown)"

            if sys.stdout.isatty():
                sys.stdout.write("\r downloading %s of %s " % (amount, total))
            else:
                # When logging to CI logs, avoid spamming the logs and print
                # status every few seconds
                now = time.time()
                if now - self.last_report > 5:
                    sys.stdout.write(".. %s of %s " % (amount, total))
                    self.last_report = now
            sys.stdout.flush()

        def progress_pycurl(self, total, amount, _uploadtotal, _uploadamount):
            self.write_update(total, amount)

        def progress_urllib(self, count, block, total):
            amount = count * block
            self.write_update(total, amount)

    progress = Progress()
    start = time.time()
    try:
        if os.environ.get("GETDEPS_USE_LIBCURL") is not None:
            import pycurl

            with open(file_name, "wb") as f:
                c = pycurl.Curl()
                c.setopt(pycurl.URL, url)
                c.setopt(pycurl.WRITEDATA, f)
                # display progress
                c.setopt(pycurl.NOPROGRESS, False)
                c.setopt(pycurl.XFERINFOFUNCTION, progress.progress_pycurl)
                c.perform()
                c.close()
            headers = None
        else:
            (_filename, headers) = urlretrieve(
                url, file_name, reporthook=progress.progress_urllib
            )
    except (OSError, IOError) as exc:  # noqa: B014
        raise TransientFailure(
            "Failed to download %s to %s: %s" % (url, file_name, str(exc))
        )

    end = time.time()
    sys.stdout.write(" [Complete in %f seconds]\n" % (end - start))
    sys.stdout.flush()
    if headers is not None:
        print(f"{headers}")