in genai-for-marketing/backend_apis/app/main.py [0:0]
def post_consumer_insights(data: ConsumerInsightsRequest
) -> ConsumerInsightsResponse:
"""Query Vertex AI Search and return top 10 results.
Parameters:
query: str
Returns:
results: list
"""
datastore_location = "global"
results = []
try:
search_results = utils_search.search(
search_query=data.query,
project_id=project_id,
location=datastore_location,
search_engine_id=vertexai_search_datastore,
serving_config_id="default_config",
search_client=search_client)
except Exception as e:
raise HTTPException(
status_code=400,
detail="Something went wrong. Please try again.")
else:
llm_summary = search_results.summary.summary_text
for search_result in search_results.results:
search_result_dict = Message.to_dict(search_result)
document = search_result_dict.get("document", {})
struct_data = document.get("derived_struct_data",{})
title = struct_data.get("title", "")
link = struct_data.get("link", "")
snippets = struct_data.get("snippets", [])
if len(snippets) > 0:
snippet = snippets[0].get("snippet", "")
html_snippet = snippets[0].get("htmlSnippet", "")
result = {
"title": title,
"link": link,
"snippet": snippet,
"html_snippet": html_snippet
}
results.append(result)
return ConsumerInsightsResponse(
results=results,
llm_summary=llm_summary
)