in gemini/sample-apps/genwealth/function-scripts/analyze-prospectus/main.py [0:0]
def analyze_prospectus(cloud_event):
"""Function to analyze prospectus"""
# Print out the data from Pub/Sub, to prove that it worked
ticker = base64.b64decode(cloud_event.data["message"]["data"])
ticker = ticker.decode("utf-8")
print(ticker)
# Environment Vars
region = os.environ["REGION"]
project_id = os.environ["PROJECT_ID"]
# AlloyDB Vars
cluster = "alloydb-cluster"
instance = "alloydb-instance"
database = "ragdemos"
table_name = "langchain_vector_store"
user = "postgres"
password = os.environ["ALLOYDB_PASSWORD"]
# Setup sync connector
connector = Connector()
def getconn():
conn = connector.connect(
f"projects/{project_id}/locations/{region}/clusters/{cluster}/instances/{instance}",
"pg8000",
user=user,
password=password,
db=database,
)
return conn
# create connection pool
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
)
# Prep SQL statement
sql = f"SELECT content FROM {table_name} WHERE ticker = '{ticker}' ORDER BY page, page_chunk"
# Prep model and template
model = VertexAI(
model_name="gemini-2.0-flash", max_output_tokens=1024, temperature=0.0
)
template = """
<MISSION>
You are an experienced financial analyst. Your mission is to create a detailed
company financial overview for {ticker} using their latest prospectus. I will be
sending you the prospectus a few chunks at a time. There are a total of
{total_chunk_count} prospectus chunks, and I am sending you prospectus chunk numbers
{first_chunk}-{last_chunk} as part of this request.
</MISSION>
<TASK>
Use the financial overview labeled <OVERVIEW> below, and use the additional details from
the section labeled <ADDITIONAL_CONTEXT> below to improve the financial overview in the <OVERVIEW>.
Respond using less than 4000 characters, including whitespace.
</TASK>
<OVERVIEW>
{previous_overview}
</OVERVIEW>
<ADDITIONAL_CONTEXT>
{chunk_text}
</ADDITIONAL_CONTEXT>"""
prompt = PromptTemplate.from_template(template)
# Create overview of full document by iterating through chunks
with pool.connect() as db_conn:
# query database
result = db_conn.execute(sqlalchemy.text(sql)).fetchall()
# commit transaction (SQLAlchemy v2.X.X is commit as you go)
db_conn.commit()
# Iterate through results
total_chunk_count = len(result)
overview = ""
chunk_text = ""
first_chunk = 1
last_chunk = 1
for i in range(len(result)):
current_chunk = i + 1
first_chunk = min(first_chunk, current_chunk)
last_chunk = max(last_chunk, current_chunk)
# Add text to chunk_text until token window is full
chunk_text = chunk_text + str(result[i].content) + " "
if len(chunk_text) < 50000:
continue
# Invoke the model
print(
f"Adding chunks {first_chunk} through {last_chunk} out of {total_chunk_count} to {ticker} overview..."
)
fmt_prompt = prompt.format(
total_chunk_count=total_chunk_count,
first_chunk=first_chunk,
last_chunk=last_chunk,
previous_overview=overview,
chunk_text=chunk_text,
ticker=ticker,
)
overview = model.invoke(fmt_prompt)
# Reset first_chunk and chunk_text values
first_chunk = current_chunk + 1
chunk_text = ""
analysis = model.invoke(
f"You are an experienced financial analyst. Write a financial analysis for ticker {ticker} that includes an Investment Rating (buy, sell, or hold), Investment Risk (high, medium, low), Target Investor (conservative, neutral, aggressive) and a two-paragraph analysis. Use the following company overview as context for the analysis: \n\n{overview}"
)
rating = model.invoke(
f"Answering with only 1 word, classify ticker {ticker} as one of [BUY, SELL, HOLD] based on the following analysis: {analysis}"
)
rating = rating.strip()
insert_stmt = sqlalchemy.text(
"INSERT INTO investments (id, ticker, etf, market, rating, overview, analysis) VALUES (:id, :ticker, :etf, :market, :rating, :overview, :analysis)"
)
with pool.connect() as db_conn:
max_id = db_conn.execute(
sqlalchemy.text("SELECT MAX(id) FROM investments")
).fetchall()
new_id = max_id[0][0] + 1
print(new_id)
# insert into database
db_conn.execute(
insert_stmt,
parameters={
"id": new_id,
"ticker": ticker,
"etf": False,
"market": "US",
"rating": rating,
"overview": overview,
"analysis": analysis,
},
)
# commit transaction (SQLAlchemy v2.X.X is commit as you go)
db_conn.commit()
print("Finished insert")
print("Closing database connection.")
connector.close()
print(f"Finished analyzing ticker {ticker}.")