scripts/process_jsonl/process_jsonl.py (104 lines of code) (raw):
import uuid
import json
import argparse
import asyncio
from loguru import logger
from models.models import Document, DocumentMetadata
from datastore.datastore import DataStore
from datastore.factory import get_datastore
from services.extract_metadata import extract_metadata_from_document
from services.pii_detection import screen_text_for_pii
DOCUMENT_UPSERT_BATCH_SIZE = 50
async def process_jsonl_dump(
filepath: str,
datastore: DataStore,
custom_metadata: dict,
screen_for_pii: bool,
extract_metadata: bool,
):
# open the jsonl file as a generator of dictionaries
with open(filepath) as jsonl_file:
data = [json.loads(line) for line in jsonl_file]
documents = []
skipped_items = []
# iterate over the data and create document objects
for item in data:
if len(documents) % 20 == 0:
logger.info(f"Processed {len(documents)} documents")
try:
# get the id, text, source, source_id, url, created_at and author from the item
# use default values if not specified
id = item.get("id", None)
text = item.get("text", None)
source = item.get("source", None)
source_id = item.get("source_id", None)
url = item.get("url", None)
created_at = item.get("created_at", None)
author = item.get("author", None)
if not text:
logger.info("No document text, skipping...")
continue
# create a metadata object with the source, source_id, url, created_at and author
metadata = DocumentMetadata(
source=source,
source_id=source_id,
url=url,
created_at=created_at,
author=author,
)
# update metadata with custom values
for key, value in custom_metadata.items():
if hasattr(metadata, key):
setattr(metadata, key, value)
# screen for pii if requested
if screen_for_pii:
pii_detected = screen_text_for_pii(text)
# if pii detected, print a warning and skip the document
if pii_detected:
logger.info("PII detected in document, skipping")
skipped_items.append(item) # add the skipped item to the list
continue
# extract metadata if requested
if extract_metadata:
# extract metadata from the document text
extracted_metadata = extract_metadata_from_document(
f"Text: {text}; Metadata: {str(metadata)}"
)
# get a Metadata object from the extracted metadata
metadata = DocumentMetadata(**extracted_metadata)
# create a document object with the id, text and metadata
document = Document(
id=id,
text=text,
metadata=metadata,
)
documents.append(document)
except Exception as e:
# log the error and continue with the next item
logger.error(f"Error processing {item}: {e}")
skipped_items.append(item) # add the skipped item to the list
# do this in batches, the upsert method already batches documents but this allows
# us to add more descriptive logging
for i in range(0, len(documents), DOCUMENT_UPSERT_BATCH_SIZE):
# Get the text of the chunks in the current batch
batch_documents = documents[i : i + DOCUMENT_UPSERT_BATCH_SIZE]
logger.info(f"Upserting batch of {len(batch_documents)} documents, batch {i}")
await datastore.upsert(batch_documents)
# print the skipped items
logger.info(f"Skipped {len(skipped_items)} items due to errors or PII detection")
for item in skipped_items:
logger.info(item)
async def main():
# parse the command-line arguments
parser = argparse.ArgumentParser()
parser.add_argument("--filepath", required=True, help="The path to the jsonl dump")
parser.add_argument(
"--custom_metadata",
default="{}",
help="A JSON string of key-value pairs to update the metadata of the documents",
)
parser.add_argument(
"--screen_for_pii",
default=False,
type=bool,
help="A boolean flag to indicate whether to try the PII detection function (using a language model)",
)
parser.add_argument(
"--extract_metadata",
default=False,
type=bool,
help="A boolean flag to indicate whether to try to extract metadata from the document (using a language model)",
)
args = parser.parse_args()
# get the arguments
filepath = args.filepath
custom_metadata = json.loads(args.custom_metadata)
screen_for_pii = args.screen_for_pii
extract_metadata = args.extract_metadata
# initialize the db instance once as a global variable
datastore = await get_datastore()
# process the jsonl dump
await process_jsonl_dump(
filepath, datastore, custom_metadata, screen_for_pii, extract_metadata
)
if __name__ == "__main__":
asyncio.run(main())