in backend-apis/app/routers/p5_contact_center_analyst.py [0:0]
def search_reviews(data: SearchReviewsRequest) -> SearchReviewsResponse:
"""
# Search for reviews on Vertex AI Search Datastore
## Request Body [SearchReviewsRequest]:
**query**: *string*
- User input to search the datastore
**user_pseudo_id**: *string*
- User unique ID
**conversation_id**: *string*
- ID of the Vertex AI Search Conversation
**rating**: *list*
- Filter field for conversation rating
- Allowed values
- 1
- 2
- 3
- 4
- 5
**sentiment**: *list*
- Filter field for conversation sentiment
- Allowed values
- positive
- negative
- neutral
**category**: *list*
- Filter field for conversation category
- Allowed values
- Bath Robe
- Bath Towel Set
- Bed
- Bookcase
- Chair
- Console Table
- Dining Table
- Game Table
- Grill
- Office Chair
- Ottoman
- Outdoor Heater
- Pool
- Sofa
- Tool Cabinet
**customer_id**: *string*
- Filter field for review customer_id
**product_id**: *string*
- Filter field for review product_id
## Response Body [SearchReviewsResponse]:
**responses**: *dictionary*
- Search results, including information about the review
**conversation_id**: *string*
- ID of the Vertex AI Search Conversation
"""
if not data.conversation_id:
try:
conversation_id = utils_search.create_new_conversation(
user_pseudo_id=data.user_pseudo_id,
datastore_id=config["search-persona5"]["reviews_datastore_id"],
).name
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error creating a Vertex AI Conversation session. "
f"{str(e)}",
) from e
else:
conversation_id = data.conversation_id
search_filter = ""
if data.customer_id:
search_filter += f'customer_id: ANY("{data.customer_id}") '
if data.product_id:
search_filter += f'product_id: ANY("{data.product_id}") '
if data.rating:
search_filter += 'rating: ANY("'
search_filter += '","'.join(data.rating)
search_filter += '") '
if data.sentiment:
search_filter += 'sentiment: ANY("'
search_filter += '","'.join(data.sentiment)
search_filter += '") '
if data.category:
search_filter += 'category: ANY("'
search_filter += '","'.join(data.category)
search_filter += '") '
try:
results = utils_search.vertexai_search_multiturn(
search_query=data.query,
conversation_id=conversation_id,
# search_filter=search_filter, # Uncomment when available
datastore_id=config["search-persona5"]["reviews_datastore_id"],
)
results = Message.to_dict(results)
except GoogleAPICallError as e:
raise HTTPException(
status_code=400,
detail=f"Error searching Vertex AI datatore. " f"{str(e)}",
) from e
responses = {}
reply = results.get("reply", "")
summary = reply.get("summary", "")
summary_text = summary.get("summary_text", "")
responses["summary"] = summary_text
responses["user_input"] = data.query
responses["search_results"] = []
search_results = results.get("search_results", "")
for i in search_results:
document = i.get("document", "")
struct_data = document.get("struct_data", "")
responses["search_results"].append(
{
"id": i["id"],
"sentiment": struct_data["sentiment"],
"customer_id": struct_data["customer_id"],
"customer_email": struct_data["customer_email"],
"product_id": struct_data["product_id"],
"category": struct_data["category"],
"rating": struct_data["rating"],
"title": struct_data["title"],
"review": struct_data["review"],
}
)
return SearchReviewsResponse(
responses=responses, conversation_id=conversation_id
)