def _copy_dir()

in luigi/contrib/s3.py [0:0]


    def _copy_dir(self, source_path, destination_path, threads=DEFAULT_THREADS,
                  start_time=None, end_time=None, part_size=DEFAULT_PART_SIZE, **kwargs):
        start = datetime.datetime.now()
        copy_jobs = []
        management_pool = ThreadPool(processes=threads)
        transfer_config = TransferConfig(max_concurrency=threads, multipart_chunksize=part_size)
        src_bucket, src_key = self._path_to_bucket_and_key(source_path)
        dst_bucket, dst_key = self._path_to_bucket_and_key(destination_path)
        src_prefix = self._add_path_delimiter(src_key)
        dst_prefix = self._add_path_delimiter(dst_key)
        key_path_len = len(src_prefix)
        total_size_bytes = 0
        total_keys = 0
        for item in self.list(source_path, start_time=start_time, end_time=end_time, return_key=True):
            path = item.key[key_path_len:]
            # prevents copy attempt of empty key in folder
            if path != '' and path != '/':
                total_keys += 1
                total_size_bytes += item.size
                copy_source = {
                    'Bucket': src_bucket,
                    'Key': src_prefix + path
                }
                the_kwargs = {'Config': transfer_config, 'ExtraArgs': kwargs}
                job = management_pool.apply_async(self.s3.meta.client.copy,
                                                  args=(copy_source, dst_bucket, dst_prefix + path),
                                                  kwds=the_kwargs)
                copy_jobs.append(job)
        # Wait for the pools to finish scheduling all the copies
        management_pool.close()
        management_pool.join()
        # Raise any errors encountered in any of the copy processes
        for result in copy_jobs:
            result.get()
        end = datetime.datetime.now()
        duration = end - start
        logger.info('%s : Complete : %s total keys copied in %s' %
                    (datetime.datetime.now(), total_keys, duration))
        return total_keys, total_size_bytes