def _update()

in azure/datalake/store/transfer.py [0:0]


    def _update(self, future):

        if future in self._cfutures:
            obj = self._cfutures[future]
            parent = self._chunks[obj]['parent']
            cstates = self._files[parent]['cstates']
            src, dst = parent

            if future.cancelled():
                cstates[obj] = 'cancelled'
            elif future.exception():
                self._chunks[obj]['exception'] = repr(future.exception())
                cstates[obj] = 'errored'
            else:
                nbytes, exception = future.result()
                self._chunks[obj]['actual'] = nbytes
                self._chunks[obj]['exception'] = exception
                if exception:
                    cstates[obj] = 'errored'
                elif self._chunks[obj]['expected'] != nbytes:
                    name, offset = obj
                    cstates[obj] = 'errored'
                    exception = DatalakeIncompleteTransferException(
                        'chunk {}, offset {}: expected {} bytes, transferred {} bytes'.format(
                            name, offset, self._chunks[obj]['expected'],
                            self._chunks[obj]['actual']))
                    self._chunks[obj]['exception'] = exception
                    logger.error("Incomplete transfer: %s -> %s, %s",
                                 src, dst, repr(exception))
                else:
                    cstates[obj] = 'finished'
                    self._update_progress(nbytes)

            if cstates.contains_all('finished'):
                logger.debug("Chunks transferred")
                if self._merge and len(cstates.objects) > 1:
                    logger.debug("Merging file: %s", self._fstates[parent])
                    self._fstates[parent] = 'merging'
                    merge_future = self._pool.submit(
                        self._merge, self._adlfs, dst,
                        [chunk for chunk, _ in sorted(cstates.objects,
                                                      key=operator.itemgetter(1))], 
                        overwrite=self._parent._overwrite,
                        shutdown_event=self._shutdown_event)
                    self._ffutures[merge_future] = parent
                    merge_future.add_done_callback(self._update)
                else:
                    if not self._chunked and str(dst).endswith('.inprogress'):
                        logger.debug("Renaming file to remove .inprogress: %s", self._fstates[parent])
                        self._fstates[parent] = 'merging'    
                        self._rename_file(dst, dst.replace('.inprogress',''), overwrite=self._parent._overwrite)
                        dst = dst.replace('.inprogress', '')

                    self._fstates[parent] = 'finished'
                    logger.info("Transferred %s -> %s", src, dst)
            elif cstates.contains_none('running', 'pending'):
                logger.error("Transfer failed: %s -> %s", src, dst)
                self._fstates[parent] = 'errored'
        elif future in self._ffutures:
            src, dst = self._ffutures[future]

            if future.cancelled():
                self._fstates[(src, dst)] = 'cancelled'
            elif future.exception():
                self._files[(src, dst)]['exception'] = repr(future.exception())
                self._fstates[(src, dst)] = 'errored'
            else:
                exception = future.result()
                self._files[(src, dst)]['exception'] = exception
                if exception:
                    self._fstates[(src, dst)] = 'errored'
                else:
                    self._fstates[(src, dst)] = 'finished'
                    logger.info("Transferred %s -> %s", src, dst)
        # TODO: Re-enable progress saving when a less IO intensive solution is available.
        # See issue: https://github.com/Azure/azure-data-lake-store-python/issues/117
        #self.save()
        else:
            raise ValueError("Illegal state future {} not found in either file futures {} nor chunk futures {}"
                             .format(future, self._ffutures, self._cfutures))
        if self.verbose:
            print('\b' * 200, self.status, end='')
            sys.stdout.flush()