def run()

in azure/datalake/store/transfer.py [0:0]


    def run(self, nthreads=None, monitor=True, before_start=None):
        self._pool = ThreadPoolExecutor(self._nthreads)
        self._shutdown_event = threading.Event()
        self._nthreads = nthreads or self._nthreads
        self._ffutures = {}
        self._cfutures = {}

        for src, dst in self._files:
            if before_start:
                before_start(self._adlfs, src, dst)
            self._start(src, dst)

        if monitor:
            self.monitor(timeout=self._timeout)
            has_errors = False
            error_list = []
            for f in self.progress:
                for chunk in f.chunks:
                    if chunk.state == 'finished':
                        continue
                    if chunk.exception:
                        error_string = '{} -> {}, chunk {} {}: {}, {}'.format(
                            f.src, f.dst, chunk.name, chunk.offset,
                            chunk.state, repr(chunk.exception))
                        logger.error(error_string)
                        has_errors = True
                        error_list.append(error_string)
                    else:
                        error_string = '{} -> {}, chunk {} {}: {}'.format(
                            f.src, f.dst, chunk.name, chunk.offset,
                            chunk.state)
                        logger.error(error_string)
                        error_list.append(error_string)
                        has_errors = True
            if has_errors:
                raise DatalakeIncompleteTransferException('One more more exceptions occured during transfer, resulting in an incomplete transfer. \n\n List of exceptions and errors:\n {}'.format('\n'.join(error_list)))