def process_tar_index()

in create_only_with_pdfs/load_data.py [0:0]


def process_tar_index(tar_index, step_size, question_answer_df):
    shard_nr = tar_index//step_size
    loaded_datasets = []

    for inner_idx in range(step_size):
        tar_file = os.path.join(DATA_PATH, TAR_FILE_PATTERN.format(tar_index+inner_idx))
        try:
            print(f"Loading dataset from: {tar_file}")
            hf_dataset = datasets.load_dataset('webdataset', split='train', data_files=tar_file, cache_dir="/fsx/.cache").to_pandas()
            hf_dataset.__key__ = hf_dataset.__key__.apply(pd.to_numeric)
            loaded_datasets.append(hf_dataset)
        except Exception as e:
            print(f"Error loading dataset from: {tar_file}")
            print(e)

    hf_dataset = pd.concat(loaded_datasets, ignore_index=True)
    print(f"Concatenated datasets with {len(hf_dataset)} samples")

    hf_dataset = hf_dataset[hf_dataset['__key__'].isin(question_answer_df['__key__'].unique())] # Filter samples that are not present in question_answer_df

    # Merging dataframes on '__key__' column
    merged_df = pd.merge(hf_dataset, question_answer_df, on='__key__', how='inner')

    # Using ThreadPoolExecutor for parallel processing of groups
    data_extracted = []
    max_threads = 10  # Number of threads to use
    with ThreadPoolExecutor(max_threads) as executor:
        results = list(tqdm(executor.map(process_group, merged_df.groupby('__key__')), desc='Extracting data', total=len(merged_df['__key__'].unique())))

    data_extracted.extend(results)
    data_extracted = list(filter(lambda item: item is not None, data_extracted)) # Filter out None values

    FEATURES = datasets.Features(
        {
            "pdf": datasets.Value("binary"),
            "texts": [
                {
                    "user": datasets.Value("string"),
                    "assistant": datasets.Value("string"),
                    "source": datasets.Value("string"),
                }
            ],
        }
    )
    def data_generator():
        for data_dict in data_extracted:
            yield data_dict
    #
    ds_shard = datasets.Dataset.from_generator(data_generator, features=FEATURES, writer_batch_size=100, cache_dir="/fsx/.cache")
    ds_shard.save_to_disk(f'/fsx/m4/datasets/docmatix_pdf/shard_{shard_nr}')