in azure/datalake/store/multithread.py [0:0]
def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize,
shutdown_event=None, retries=10, delay=0.01, backoff=3):
""" Download a piece of a remote file and write locally
Internal function used by `download`.
"""
err = None
total_bytes_downloaded = 0
retry_policy = ExponentialRetryPolicy(max_retries=retries, exponential_retry_interval=delay,
exponential_factor=backoff)
filesessionid = str(uuid.uuid4())
try:
nbytes = 0
start = offset
with open(dst, 'rb+') as fout:
fout.seek(start)
while start < offset+size:
with closing(_fetch_range(adlfs.azure, src, start=start,
end=min(start+blocksize, offset+size), stream=True,
retry_policy=retry_policy, filesessionid=filesessionid)) as response:
chunk = response.content
if shutdown_event and shutdown_event.is_set():
return total_bytes_downloaded, None
if chunk:
nwritten = fout.write(chunk)
if nwritten:
nbytes += nwritten
start += nwritten
else:
raise IOError("Failed to write to disk for {0} at location {1} with blocksize {2}".format(dst, start, blocksize))
logger.debug('Downloaded %s bytes to %s, byte offset %s', nbytes, dst, offset)
# There are certain cases where we will be throttled and recieve less than the expected amount of data.
# In these cases, instead of failing right away, instead indicate a retry is occuring and update offset and
# size to attempt another read to get the rest of the data. We will only do this if the amount of bytes read
# is less than size, because if somehow we recieved too much data we are not clear on how to proceed.
if nbytes < size:
errMsg = 'Did not recieve total bytes requested from server. This can be due to server side throttling and will be retried. Data Expected: {}. Data Received: {}.'.format(size, nbytes)
size -= nbytes
offset += nbytes
total_bytes_downloaded += nbytes
raise IOError(errMsg)
elif nbytes > size:
raise IOError('Received more bytes than expected from the server. Expected: {}. Received: {}.'.format(size, nbytes))
else:
total_bytes_downloaded += nbytes
return total_bytes_downloaded, None
except Exception as e:
err = e
logger.debug('Exception %s on ADL download on attempt', repr(err))
exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err))
logger.error('Download failed %s; %s', dst, repr(exception))
return total_bytes_downloaded, exception