in pipeline/common/downloads.py [0:0]
def download_chunks(self) -> Generator[bytes, None, None]:
"""
This method is the generator that is responsible for running the request, and retrying
when there is a failure. It yields the fixed size byte chunks, and exposes a generator
to be consumed. This generator can be used directly in a for loop, or the entire class
can be passed in as a file handle.
"""
next_report_percent = self.report_every
total_bytes = 0
exception = None
for retry in range(self.total_retries):
if retry > 0:
logger.error(f"Remaining retries: {self.total_retries - retry}")
try:
headers = {}
if self.downloaded_bytes > 0:
# Pick up the download from where it was before.
headers = {"Range": f"bytes={self.downloaded_bytes}-"}
self.response = requests.get(
self.url, headers=headers, stream=True, timeout=self.timeout_sec
)
self.response.raise_for_status()
# Report the download size.
if not total_bytes and "content-length" in self.response.headers:
total_bytes = int(self.response.headers["content-length"])
logger.info(f"Download size: {total_bytes:,} bytes")
for chunk in self.response.iter_content(chunk_size=self.chunk_bytes):
if not chunk:
continue
self.downloaded_bytes += len(chunk)
# Report the percentage downloaded every `report_every` percentage.
if total_bytes and self.downloaded_bytes >= next_report_percent * total_bytes:
logger.info(
f"{self.downloaded_bytes / total_bytes * 100.0:.0f}% downloaded "
f"({self.downloaded_bytes}/{total_bytes} bytes)"
)
next_report_percent += self.report_every
yield chunk
# The download is complete.
self.close()
logger.info("100% downloaded - Download finished.")
return
except requests.exceptions.Timeout as error:
logger.error(f"The connection timed out: {error}.")
exception = error
except requests.exceptions.RequestException as error:
# The RequestException is the generic error that catches all classes of "requests"
# errors. Don't attempt to be be smart about this, just attempt again until
# the retries are done.
logger.error(f"A download error occurred: {error}")
exception = error
# Close out the response on an error. It will be recreated when retrying.
if self.response:
self.response.close()
self.response = None
logger.info(f"Retrying in {self.wait_before_retry_sec} sec")
time.sleep(self.wait_before_retry_sec)
self.close()
raise DownloadException("The download failed.") from exception