in gemini/sample-apps/genwealth/function-scripts/process-pdf/main.py [0:0]
def process_pdf(cloud_event):
"""Main function"""
data = cloud_event.data
event_id = cloud_event["id"]
event_type = cloud_event["type"]
bucket = data["bucket"]
name = data["name"]
metageneration = data["metageneration"]
timeCreated = data["timeCreated"]
updated = data["updated"]
print(f"Event ID: {event_id}")
print(f"Event type: {event_type}")
print(f"Bucket: {bucket}")
print(f"File: {name}")
print(f"Metageneration: {metageneration}")
print(f"Created: {timeCreated}")
print(f"Updated: {updated}")
file_name, extension = os.path.splitext(name)
if extension != ".pdf":
print("File is not a PDF. Please submit a PDF for processing instead.")
return
# Project vars
region = os.environ["REGION"]
project_id = os.environ["PROJECT_ID"]
# Document AI Vars
source_file = f"gs://{bucket}/{name}"
gcs_output_uri = f"gs://{project_id}-doc-ai/doc-ai-output/" # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
location = "us" # Format is "us" or "eu"
processor_id = os.environ["PROCESSOR_ID"] # Create processor before running sample
blobs = batch_process_documents(
project_id=project_id,
location=location,
processor_id=processor_id,
gcs_output_uri=gcs_output_uri,
gcs_input_uri=source_file, # Format: gs://bucket/directory/file.pdf
input_mime_type="application/pdf",
)
# Document AI may output multiple JSON files per source file
lc_doc = []
for blob in blobs:
# Document AI should only output JSON files to GCS
if blob.content_type != "application/json":
print(
f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}"
)
continue
# Download JSON File as bytes object and convert to Document Object
print(f"Fetching {blob.name}")
document = documentai.Document.from_json(
blob.download_as_bytes(), ignore_unknown_fields=True
)
# Create LangChain doc
page = Document(
page_content=document.text,
metadata={
"source": source_file,
"page": document.shard_info.shard_index + 1,
"ticker": Path(source_file).stem,
"page_size": len(document.text),
"doc_ai_shard_count": document.shard_info.shard_count,
"doc_ai_shard_index": document.shard_info.shard_index,
"doc_ai_chunk_size": blob._CHUNK_SIZE_MULTIPLE,
"doc_ai_chunk_uri": blob.public_url,
},
)
lc_doc.append(page)
# Split docs into smaller chunks (max 3072 tokens, 9216 characters)
lc_doc_chunks = split_document(lc_doc)
# Setup embeddings
embedding = VertexAIEmbeddings(model_name="text-embedding-005", project=project_id)
# AlloyDB Vars
cluster = "alloydb-cluster"
instance = "alloydb-instance"
database = "ragdemos"
table_name = "langchain_vector_store"
user = "postgres"
password = os.environ["ALLOYDB_PASSWORD"]
initialize_vector_store = False
ip_type = os.environ["IP_TYPE"]
# Create vector store
engine = AlloyDBEngine.from_instance(
project_id=project_id,
region=region,
cluster=cluster,
instance=instance,
database=database,
user=user,
password=password,
ip_type=ip_type,
)
if initialize_vector_store:
engine.init_vectorstore_table(
table_name=table_name,
vector_size=768, # Vector size for VertexAI model(text-embedding-005)
metadata_columns=[
Column("source", "VARCHAR", nullable=True),
Column("page", "INTEGER", nullable=True),
Column("ticker", "VARCHAR", nullable=True),
Column("page_size", "INTEGER", nullable=True),
Column("doc_ai_shard_count", "INTEGER", nullable=True),
Column("doc_ai_shard_index", "INTEGER", nullable=True),
Column("doc_ai_chunk_size", "INTEGER", nullable=True),
Column("doc_ai_chunk_uri", "VARCHAR", nullable=True),
Column("page_chunk", "INTEGER", nullable=True),
Column("chunk_size", "INTEGER", nullable=True),
],
overwrite_existing=True,
)
store = AlloyDBVectorStore.create_sync(
engine=engine,
table_name=table_name,
embedding_service=embedding,
metadata_columns=[
"source",
"page",
"ticker",
"page_size",
"doc_ai_shard_count",
"doc_ai_shard_index",
"doc_ai_chunk_size",
"doc_ai_chunk_uri",
"page_chunk",
"chunk_size",
],
)
ids = [str(uuid.uuid4()) for i in range(len(lc_doc_chunks))]
store.add_documents(lc_doc_chunks, ids)
print("Finished processing pdf")
# Send message to pubsub topic to kick off next step
ticker = Path(source_file).stem
publisher = pubsub_v1.PublisherClient()
topic_name = f"projects/{project_id}/topics/{project_id}-doc-ready"
future = publisher.publish(topic_name, bytes(f"{ticker}".encode()), spam="done")
future.result()
print("Sent message to pubsub")