in torchtext/utils.py [0:0]
def extract_archive(from_path, to_path=None, overwrite=False):
"""Extract archive.
Args:
from_path: the path of the archive.
to_path: the root path of the extracted files (directory of from_path)
overwrite: overwrite existing files (False)
Returns:
List of paths to extracted files even if not overwritten.
Examples:
>>> url = 'http://www.quest.dcs.shef.ac.uk/wmt16_files_mmt/validation.tar.gz'
>>> from_path = './validation.tar.gz'
>>> to_path = './'
>>> torchtext.utils.download_from_url(url, from_path)
>>> torchtext.utils.extract_archive(from_path, to_path)
>>> ['.data/val.de', '.data/val.en']
>>> torchtext.utils.download_from_url(url, from_path)
>>> torchtext.utils.extract_archive(from_path, to_path)
>>> ['.data/val.de', '.data/val.en']
"""
if to_path is None:
to_path = os.path.dirname(from_path)
if from_path.endswith(('.tar.gz', '.tgz')):
logging.info('Opening tar file {}.'.format(from_path))
with tarfile.open(from_path, 'r') as tar:
files = []
for file_ in tar:
file_path = os.path.join(to_path, file_.name)
if file_.isfile():
files.append(file_path)
if os.path.exists(file_path):
logging.info('{} already extracted.'.format(file_path))
if not overwrite:
continue
tar.extract(file_, to_path)
logging.info('Finished extracting tar file {}.'.format(from_path))
return files
elif from_path.endswith('.zip'):
assert zipfile.is_zipfile(from_path), from_path
logging.info('Opening zip file {}.'.format(from_path))
with zipfile.ZipFile(from_path, 'r') as zfile:
files = []
for file_ in zfile.namelist():
file_path = os.path.join(to_path, file_)
files.append(file_path)
if os.path.exists(file_path):
logging.info('{} already extracted.'.format(file_path))
if not overwrite:
continue
zfile.extract(file_, to_path)
files = [f for f in files if os.path.isfile(f)]
logging.info('Finished extracting zip file {}.'.format(from_path))
return files
elif from_path.endswith('.gz'):
logging.info('Opening gz file {}.'.format(from_path))
default_block_size = 65536
filename = from_path[:-3]
files = [filename]
with gzip.open(from_path, 'rb') as gzfile, \
open(filename, 'wb') as d_file:
while True:
block = gzfile.read(default_block_size)
if not block:
break
else:
d_file.write(block)
d_file.write(block)
logging.info('Finished extracting gz file {}.'.format(from_path))
return files
else:
raise NotImplementedError(
"We currently only support tar.gz, .tgz, .gz and zip achives.")