in luigi/contrib/s3.py [0:0]
def _copy_dir(self, source_path, destination_path, threads=DEFAULT_THREADS,
start_time=None, end_time=None, part_size=DEFAULT_PART_SIZE, **kwargs):
start = datetime.datetime.now()
copy_jobs = []
management_pool = ThreadPool(processes=threads)
transfer_config = TransferConfig(max_concurrency=threads, multipart_chunksize=part_size)
src_bucket, src_key = self._path_to_bucket_and_key(source_path)
dst_bucket, dst_key = self._path_to_bucket_and_key(destination_path)
src_prefix = self._add_path_delimiter(src_key)
dst_prefix = self._add_path_delimiter(dst_key)
key_path_len = len(src_prefix)
total_size_bytes = 0
total_keys = 0
for item in self.list(source_path, start_time=start_time, end_time=end_time, return_key=True):
path = item.key[key_path_len:]
# prevents copy attempt of empty key in folder
if path != '' and path != '/':
total_keys += 1
total_size_bytes += item.size
copy_source = {
'Bucket': src_bucket,
'Key': src_prefix + path
}
the_kwargs = {'Config': transfer_config, 'ExtraArgs': kwargs}
job = management_pool.apply_async(self.s3.meta.client.copy,
args=(copy_source, dst_bucket, dst_prefix + path),
kwds=the_kwargs)
copy_jobs.append(job)
# Wait for the pools to finish scheduling all the copies
management_pool.close()
management_pool.join()
# Raise any errors encountered in any of the copy processes
for result in copy_jobs:
result.get()
end = datetime.datetime.now()
duration = end - start
logger.info('%s : Complete : %s total keys copied in %s' %
(datetime.datetime.now(), total_keys, duration))
return total_keys, total_size_bytes