in modules/SwissArmyTransformer/sat/data_utils/webds.py [0:0]
def tar_file_iterator_with_meta(fileobj, meta_names, skip_meta=r"__[^/]*__($|/)", suffix=None,handler=reraise_exception, meta_stream=None):
"""Iterate over tar file, yielding filename, content pairs for the given tar stream.
:param fileobj: byte stream suitable for tarfile
:param meta_names: key of different items in meta file
:param skip_meta: regexp for keys that are skipped entirely (Default value = r"__[^/]*__($|/)")
"""
stream = tarfile.open(fileobj=fileobj, mode="r|*")
data_dir, filename = fileobj.name.rsplit('/', 1)
meta_data = {} # {id: {meta_name: meta_value, meta_name2: meta_value2, ...}}
if meta_stream is None:
meta_file_name = filename.split('.')[0] + '.meta.jsonl'
meta_path = os.path.join(data_dir, meta_file_name)
if os.path.exists(meta_path):
meta_stream = open(meta_path, 'r')
else:
meta_file_name = meta_stream.name
if meta_stream is not None:
for lineno, line in enumerate(meta_stream):
meta_list = []
try:
meta_list.append(json.loads(line))
except Exception as exn:
from sat.helpers import print_rank0
print_rank0(f'Error in loading jsonl {meta_file_name}, lineno {lineno}: {line}', level='DEBUG')
continue
for item in meta_list:
if not item['key'] in meta_data:
meta_data[item['key']] = {}
for meta_name in meta_names:
if meta_name in item:
meta_data[item['key']][meta_name] = item[meta_name]
meta_stream.close()
try:
for tarinfo in stream:
fname = tarinfo.name
try:
if not tarinfo.isreg():
continue
if fname is None:
continue
if (
"/" not in fname
and fname.startswith("__")
and fname.endswith("__")
):
# skipping metadata for now
continue
if skip_meta is not None and re.match(skip_meta, fname):
continue
if fname.endswith('.txt') and suffix is not None:
data = (stream.extractfile(tarinfo).read().decode() + suffix).encode()
else:
data = stream.extractfile(tarinfo).read()
result = dict(fname=fname, data=data)
yield result
if fname.endswith('.id'):
fid = fname.split('.')[0]
meta_data_fid = meta_data.get(fid, {})
for meta_name in meta_names:
meta_fname = fid + '.' + meta_name
meta = meta_data_fid.get(meta_name, None)
yield dict(fname=meta_fname, data=meta)
stream.members = []
except Exception as exn:
if hasattr(exn, "args") and len(exn.args) > 0:
exn.args = (exn.args[0] + " @ " + str(fileobj),) + exn.args[1:]
if handler(exn):
continue
else:
break
except Exception as exn:
print(exn)
del stream