def process_pdf()

in gemini/sample-apps/genwealth/function-scripts/process-pdf/main.py [0:0]


def process_pdf(cloud_event):
    """Main function"""
    data = cloud_event.data

    event_id = cloud_event["id"]
    event_type = cloud_event["type"]

    bucket = data["bucket"]
    name = data["name"]
    metageneration = data["metageneration"]
    timeCreated = data["timeCreated"]
    updated = data["updated"]

    print(f"Event ID: {event_id}")
    print(f"Event type: {event_type}")
    print(f"Bucket: {bucket}")
    print(f"File: {name}")
    print(f"Metageneration: {metageneration}")
    print(f"Created: {timeCreated}")
    print(f"Updated: {updated}")

    file_name, extension = os.path.splitext(name)
    if extension != ".pdf":
        print("File is not a PDF. Please submit a PDF for processing instead.")
        return

    # Project vars
    region = os.environ["REGION"]
    project_id = os.environ["PROJECT_ID"]

    # Document AI Vars
    source_file = f"gs://{bucket}/{name}"
    gcs_output_uri = f"gs://{project_id}-doc-ai/doc-ai-output/"  # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
    location = "us"  # Format is "us" or "eu"
    processor_id = os.environ["PROCESSOR_ID"]  # Create processor before running sample

    blobs = batch_process_documents(
        project_id=project_id,
        location=location,
        processor_id=processor_id,
        gcs_output_uri=gcs_output_uri,
        gcs_input_uri=source_file,  # Format: gs://bucket/directory/file.pdf
        input_mime_type="application/pdf",
    )

    # Document AI may output multiple JSON files per source file
    lc_doc = []
    for blob in blobs:
        # Document AI should only output JSON files to GCS
        if blob.content_type != "application/json":
            print(
                f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}"
            )
            continue

        # Download JSON File as bytes object and convert to Document Object
        print(f"Fetching {blob.name}")
        document = documentai.Document.from_json(
            blob.download_as_bytes(), ignore_unknown_fields=True
        )

        # Create LangChain doc
        page = Document(
            page_content=document.text,
            metadata={
                "source": source_file,
                "page": document.shard_info.shard_index + 1,
                "ticker": Path(source_file).stem,
                "page_size": len(document.text),
                "doc_ai_shard_count": document.shard_info.shard_count,
                "doc_ai_shard_index": document.shard_info.shard_index,
                "doc_ai_chunk_size": blob._CHUNK_SIZE_MULTIPLE,
                "doc_ai_chunk_uri": blob.public_url,
            },
        )
        lc_doc.append(page)

    # Split docs into smaller chunks (max 3072 tokens, 9216 characters)
    lc_doc_chunks = split_document(lc_doc)

    # Setup embeddings
    embedding = VertexAIEmbeddings(model_name="text-embedding-005", project=project_id)

    # AlloyDB Vars
    cluster = "alloydb-cluster"
    instance = "alloydb-instance"
    database = "ragdemos"
    table_name = "langchain_vector_store"
    user = "postgres"
    password = os.environ["ALLOYDB_PASSWORD"]
    initialize_vector_store = False
    ip_type = os.environ["IP_TYPE"]

    # Create vector store
    engine = AlloyDBEngine.from_instance(
        project_id=project_id,
        region=region,
        cluster=cluster,
        instance=instance,
        database=database,
        user=user,
        password=password,
        ip_type=ip_type,
    )

    if initialize_vector_store:
        engine.init_vectorstore_table(
            table_name=table_name,
            vector_size=768,  # Vector size for VertexAI model(text-embedding-005)
            metadata_columns=[
                Column("source", "VARCHAR", nullable=True),
                Column("page", "INTEGER", nullable=True),
                Column("ticker", "VARCHAR", nullable=True),
                Column("page_size", "INTEGER", nullable=True),
                Column("doc_ai_shard_count", "INTEGER", nullable=True),
                Column("doc_ai_shard_index", "INTEGER", nullable=True),
                Column("doc_ai_chunk_size", "INTEGER", nullable=True),
                Column("doc_ai_chunk_uri", "VARCHAR", nullable=True),
                Column("page_chunk", "INTEGER", nullable=True),
                Column("chunk_size", "INTEGER", nullable=True),
            ],
            overwrite_existing=True,
        )

    store = AlloyDBVectorStore.create_sync(
        engine=engine,
        table_name=table_name,
        embedding_service=embedding,
        metadata_columns=[
            "source",
            "page",
            "ticker",
            "page_size",
            "doc_ai_shard_count",
            "doc_ai_shard_index",
            "doc_ai_chunk_size",
            "doc_ai_chunk_uri",
            "page_chunk",
            "chunk_size",
        ],
    )

    ids = [str(uuid.uuid4()) for i in range(len(lc_doc_chunks))]
    store.add_documents(lc_doc_chunks, ids)

    print("Finished processing pdf")

    # Send message to pubsub topic to kick off next step
    ticker = Path(source_file).stem
    publisher = pubsub_v1.PublisherClient()
    topic_name = f"projects/{project_id}/topics/{project_id}-doc-ready"
    future = publisher.publish(topic_name, bytes(f"{ticker}".encode()), spam="done")
    future.result()
    print("Sent message to pubsub")