def tar_file_iterator_with_meta()

in modules/SwissArmyTransformer/sat/data_utils/webds.py [0:0]


def tar_file_iterator_with_meta(fileobj, meta_names, skip_meta=r"__[^/]*__($|/)", suffix=None,handler=reraise_exception, meta_stream=None):
    """Iterate over tar file, yielding filename, content pairs for the given tar stream.

    :param fileobj: byte stream suitable for tarfile
    :param meta_names: key of different items in meta file
    :param skip_meta: regexp for keys that are skipped entirely (Default value = r"__[^/]*__($|/)")

    """
    stream = tarfile.open(fileobj=fileobj, mode="r|*")
    data_dir, filename = fileobj.name.rsplit('/', 1)
    meta_data = {} # {id: {meta_name: meta_value, meta_name2: meta_value2, ...}}
    if meta_stream is None:
        meta_file_name = filename.split('.')[0] + '.meta.jsonl'
        meta_path = os.path.join(data_dir, meta_file_name)
        if os.path.exists(meta_path):
            meta_stream = open(meta_path, 'r')
    else:
        meta_file_name = meta_stream.name
    
    if meta_stream is not None:
        for lineno, line in enumerate(meta_stream):
            meta_list = []
            try:
                meta_list.append(json.loads(line))
            except Exception as exn:
                from sat.helpers import print_rank0
                print_rank0(f'Error in loading jsonl {meta_file_name}, lineno {lineno}: {line}', level='DEBUG')
                continue
            for item in meta_list:
                if not item['key'] in meta_data:
                    meta_data[item['key']] = {}
                for meta_name in meta_names:
                    if meta_name in item:
                        meta_data[item['key']][meta_name] = item[meta_name]
        meta_stream.close()
    
    try:
        for tarinfo in stream:
            fname = tarinfo.name
            try:
                if not tarinfo.isreg():
                    continue
                if fname is None:
                    continue
                if (
                    "/" not in fname
                    and fname.startswith("__")
                    and fname.endswith("__")
                ):
                    # skipping metadata for now
                    continue
                if skip_meta is not None and re.match(skip_meta, fname):
                    continue
                if fname.endswith('.txt') and suffix is not None:
                    data = (stream.extractfile(tarinfo).read().decode() + suffix).encode()
                else:
                    data = stream.extractfile(tarinfo).read()
                result = dict(fname=fname, data=data)
                yield result
                
                if fname.endswith('.id'):
                    fid = fname.split('.')[0]
                    meta_data_fid = meta_data.get(fid, {})
                    for meta_name in meta_names:
                        meta_fname = fid + '.' + meta_name
                        meta = meta_data_fid.get(meta_name, None)
                        yield dict(fname=meta_fname, data=meta)
                stream.members = []
            except Exception as exn:
                if hasattr(exn, "args") and len(exn.args) > 0:
                    exn.args = (exn.args[0] + " @ " + str(fileobj),) + exn.args[1:]
                if handler(exn):
                    continue
                else:
                    break
    except Exception as exn:
        print(exn)
    del stream