in azure/datalake/store/transfer.py [0:0]
def run(self, nthreads=None, monitor=True, before_start=None):
self._pool = ThreadPoolExecutor(self._nthreads)
self._shutdown_event = threading.Event()
self._nthreads = nthreads or self._nthreads
self._ffutures = {}
self._cfutures = {}
for src, dst in self._files:
if before_start:
before_start(self._adlfs, src, dst)
self._start(src, dst)
if monitor:
self.monitor(timeout=self._timeout)
has_errors = False
error_list = []
for f in self.progress:
for chunk in f.chunks:
if chunk.state == 'finished':
continue
if chunk.exception:
error_string = '{} -> {}, chunk {} {}: {}, {}'.format(
f.src, f.dst, chunk.name, chunk.offset,
chunk.state, repr(chunk.exception))
logger.error(error_string)
has_errors = True
error_list.append(error_string)
else:
error_string = '{} -> {}, chunk {} {}: {}'.format(
f.src, f.dst, chunk.name, chunk.offset,
chunk.state)
logger.error(error_string)
error_list.append(error_string)
has_errors = True
if has_errors:
raise DatalakeIncompleteTransferException('One more more exceptions occured during transfer, resulting in an incomplete transfer. \n\n List of exceptions and errors:\n {}'.format('\n'.join(error_list)))