in azure/datalake/store/transfer.py [0:0]
def _update(self, future):
if future in self._cfutures:
obj = self._cfutures[future]
parent = self._chunks[obj]['parent']
cstates = self._files[parent]['cstates']
src, dst = parent
if future.cancelled():
cstates[obj] = 'cancelled'
elif future.exception():
self._chunks[obj]['exception'] = repr(future.exception())
cstates[obj] = 'errored'
else:
nbytes, exception = future.result()
self._chunks[obj]['actual'] = nbytes
self._chunks[obj]['exception'] = exception
if exception:
cstates[obj] = 'errored'
elif self._chunks[obj]['expected'] != nbytes:
name, offset = obj
cstates[obj] = 'errored'
exception = DatalakeIncompleteTransferException(
'chunk {}, offset {}: expected {} bytes, transferred {} bytes'.format(
name, offset, self._chunks[obj]['expected'],
self._chunks[obj]['actual']))
self._chunks[obj]['exception'] = exception
logger.error("Incomplete transfer: %s -> %s, %s",
src, dst, repr(exception))
else:
cstates[obj] = 'finished'
self._update_progress(nbytes)
if cstates.contains_all('finished'):
logger.debug("Chunks transferred")
if self._merge and len(cstates.objects) > 1:
logger.debug("Merging file: %s", self._fstates[parent])
self._fstates[parent] = 'merging'
merge_future = self._pool.submit(
self._merge, self._adlfs, dst,
[chunk for chunk, _ in sorted(cstates.objects,
key=operator.itemgetter(1))],
overwrite=self._parent._overwrite,
shutdown_event=self._shutdown_event)
self._ffutures[merge_future] = parent
merge_future.add_done_callback(self._update)
else:
if not self._chunked and str(dst).endswith('.inprogress'):
logger.debug("Renaming file to remove .inprogress: %s", self._fstates[parent])
self._fstates[parent] = 'merging'
self._rename_file(dst, dst.replace('.inprogress',''), overwrite=self._parent._overwrite)
dst = dst.replace('.inprogress', '')
self._fstates[parent] = 'finished'
logger.info("Transferred %s -> %s", src, dst)
elif cstates.contains_none('running', 'pending'):
logger.error("Transfer failed: %s -> %s", src, dst)
self._fstates[parent] = 'errored'
elif future in self._ffutures:
src, dst = self._ffutures[future]
if future.cancelled():
self._fstates[(src, dst)] = 'cancelled'
elif future.exception():
self._files[(src, dst)]['exception'] = repr(future.exception())
self._fstates[(src, dst)] = 'errored'
else:
exception = future.result()
self._files[(src, dst)]['exception'] = exception
if exception:
self._fstates[(src, dst)] = 'errored'
else:
self._fstates[(src, dst)] = 'finished'
logger.info("Transferred %s -> %s", src, dst)
# TODO: Re-enable progress saving when a less IO intensive solution is available.
# See issue: https://github.com/Azure/azure-data-lake-store-python/issues/117
#self.save()
else:
raise ValueError("Illegal state future {} not found in either file futures {} nor chunk futures {}"
.format(future, self._ffutures, self._cfutures))
if self.verbose:
print('\b' * 200, self.status, end='')
sys.stdout.flush()