projects/conversational-commerce-agent/conversational-agent-examples/assets/apparel-search-cf/main.py (460 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# The original author of this file is unreachable, and there is no
# test-case for this file, hence it is not feasible to lint this file
# pylint: skip-file
""" Google Cloud Function to return JSON or HTML from search response
This sample uses the Retail API with client libraries
This can be used for doing AJAX/client side calls to get search results
and render in a div.
Configure the GCF to use a service account which has Retail Editor Role
"""
import flask
import google.auth
import os
import random
import tomllib
import uuid
import logging
import random
from datetime import datetime, timedelta
from firebase_admin import db, initialize_app
from firebase_functions import https_fn
from flask import request
from google.cloud import firestore
from google.cloud import retail_v2
from google.cloud.aiplatform import telemetry
from google.protobuf.json_format import MessageToDict
from google.api_core.gapic_v1.client_info import ClientInfo
from utils import shopping_cart
_USER_AGENT = "cloud-solutions/conversational-shopping-agent-v0.0.1"
TOML_PATH = os.getenv("CONFIG_TOML_PATH", "config.toml")
with open(TOML_PATH, "rb") as f:
config = tomllib.load(f)
initialize_app()
app = flask.Flask(__name__)
# Create a client to interact with Firestore
db = firestore.Client(
project=config["gcp"]["project_id"],
database=config["gcp"]["apparel_firebase_db"],
)
# GCP Project Number
PROJECT_NUMBER = config["gcp"]["project_number"]
# Retail API auth scopes
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"],
quota_project_id=config["gcp"]["project_id"],
)
auth_req = google.auth.transport.requests.Request()
client = retail_v2.SearchServiceClient(
credentials=credentials,
client_info=ClientInfo(
"cloud-solutions/conversational-commerce-agent-v0.0.1"
),
)
predict_client = retail_v2.PredictionServiceClient(credentials=credentials)
product_client = retail_v2.ProductServiceClient(credentials=credentials)
product_name = (
"projects/"
+ PROJECT_NUMBER
+ "/locations/global/catalogs/default_catalog/branches/1/products/"
)
# For now for demo purposes we are just returning the same example reviews regardless of the product.
REVIEWS = [
{
"product_id": "",
"user": "Briene Sanje",
"rating": 3,
"desc": "the ${title} fit real well at first, I suppose. However after a few hours into the night \
I started to wish I had gone up a size or two. Every couple minutes I had to \
re-adjust. It wasn't a cute look on the dance floor haha. Wish I had \
size up, otherwise I've no other regrets.",
"title": "",
},
{
"product_id": "",
"user": "Jenny Rahme",
"rating": 5,
"desc": "As a first-time buyer, I must say, I am head over heels in love with my \
purchase! The moment I delicately wear the ${title}, I felt an instant \
connection, as if it was meant to be. \
What impressed me the most was the comfort it provded throughout the day.",
"title": "",
},
{
"product_id": "",
"user": "Nadine Bhakta",
"rating": 4,
"desc": "I recently had the pleasure of wearing the most enchanting ${title} \
for a special occasion, and let me tell you, heads turned and compliments poured \
in all night long! \
The fit was mostly great.",
"title": "",
},
{
"product_id": "",
"user": "Anjali Gupta",
"rating": 5,
"desc": "I recently purchased the ${title} from this store, and I must say, it's \
absolutely stunning! The quality is premium, and the work is a testament to the remarkable craftsmanship. \
The product is beautiful, and the rich color hasn't faded a bit even after a few wears.",
"title": "",
},
]
# This acts as a simple "database" for demonstration purposes
user_info = {
"delivery_address": "638 Maple Street, Apt 11, Cupertino, CA 95014",
"payment_info": "**********4111",
"contact_number": "416-555-6704",
"email": "poetry_reader456@gmail.com",
}
@app.post("/search")
def search():
"""
returns products based on a search query from product catalog.
"""
app.logger.debug("REACHED /SEARCH")
request_json = request.get_json(silent=False)
app.logger.debug("REQUEST: %s", request_json)
# Capture the user's search query.
query = request_json["search"]
start_index = request_json[
"offset"
] # index number from which the products should be returned
session_id = str(uuid.uuid4())
visitorid = session_id
placement = "default_search" # A search model name which was configured while creating product catalog
# Retail API search request
search_request = {
"placement": "projects/"
+ PROJECT_NUMBER
+ "/locations/global/catalogs/default_catalog/placements/"
+ placement,
"query": query,
"visitor_id": visitorid,
"query_expansion_spec": {"condition": "AUTO"},
}
try:
with telemetry.tool_context_manager(_USER_AGENT):
# Retail Search API call
response = client.search(search_request)
res = MessageToDict(response._pb)
app.logger.debug("RAW RESULT: %s", res["results"])
# extract products based on the offset index from the returned products
if start_index > len(res["results"]) - 1:
return flask.jsonify(
{"message": "No more products available to show"}
)
num_products = 3 # number of products to display in the UI
end_index = start_index + num_products
end_index = (
end_index
if len(res["results"]) > end_index
else len(res["results"])
)
products = res["results"][start_index:end_index]
# remove unnecessary fields from product's data
data = get_minimal_payload(products)
app.logger.debug("RESULT: %s", data)
# Transform the product's data into a customer template format to display in the UI
response = generate_custom_template(data)
return flask.jsonify(response)
except Exception as e:
app.logger.warning("Retail Search Exception: %s", e)
return flask.jsonify({})
@app.post("/get_product_details")
def get_product_details():
"""
Fetch products details to answer customer's queries related to products.
"""
app.logger.debug("REACHED /GET_PRODUCT_DETAILS")
request_json = request.get_json(silent=False)
app.logger.debug("REQUEST: %s", request_json)
product_ids = list(set(request_json["product_ids"]))
app.logger.debug("PRODUCT_IDS: %s", product_ids)
data = []
for product_id in product_ids:
# Retail API call
with telemetry.tool_context_manager(_USER_AGENT):
product = product_client.get_product(name=product_name + product_id)
product.retrievable_fields = None
res = MessageToDict(product._pb)
obj = {"product": res}
data.append(obj)
app.logger.debug("RAW RESULT: %s", data)
# Keep only neccessary fields from product's data
product_details = get_minimal_payload(data)
app.logger.debug("PRODUCTS: %s", product_details)
return flask.jsonify({"products": product_details})
@app.post("/similar")
def similar():
"""
Find similar products using Retail Recommendation API.
"""
app.logger.debug("REACHED /SIMILAR")
request_json = request.get_json(silent=False)
app.logger.debug("REQUEST: %s", request_json)
product_id = request_json["product_id"]
page_size = 2
placement = "similar-item" # A recommendation model created during product catalog creation.
user_event = {
"event_type": "detail-page-view",
"visitor_id": str(uuid.uuid4()),
"product_details": [{"product": {"id": product_id}}],
}
# Retail Recommendation API request
predict_request = {
"placement": "projects/"
+ PROJECT_NUMBER
+ "/locations/global/catalogs/default_catalog/servingConfigs/"
+ placement,
"user_event": user_event,
"page_size": page_size,
"filter": "filterOutOfStockItems",
"params": {"returnProduct": True, "returnScore": True},
}
try:
# Retail Recommendation API call
with telemetry.tool_context_manager(_USER_AGENT):
response = predict_client.predict(predict_request)
res = MessageToDict(response._pb)
app.logger.debug("RAW RESPONSE: %s", res)
app.logger.debug("RAW RESULT: %s", res["results"])
data = res["results"]
# Remove unnecessary 'metadata' parent nodes to normalize output between /search and /similar results.
for i in range(len(data)):
data[i] = data[i]["metadata"]
data = get_minimal_payload(data)
app.logger.debug("RESULT: %s", data)
if len(data) > 0:
# Transform product's data into custom template format to display in UI
response = generate_custom_template(data)
else:
response = {}
return flask.jsonify(response)
except Exception as e:
app.logger.warning("Retail Search Exception: %s", e)
return flask.jsonify({})
@app.post("/get_reviews")
def get_reviews():
"""
retrieve product's reviews (Currently using DUMMY reviews)
"""
app.logger.debug("REACHED /GET_REVIEWS")
request_json = request.get_json(silent=False)
app.logger.debug("REQUEST: %s", request_json)
# Eventually this should look up reviews based on the product ID provided.
shown_products = request_json["shown_products"]
reviews_per_product = len(REVIEWS) // len(shown_products)
app.logger.debug("reviews_per_product: %s", reviews_per_product)
# splitting the reviews for the shown products
reviews = []
for idx, product in enumerate(shown_products):
for r in REVIEWS[
idx * reviews_per_product : (idx + 1) * reviews_per_product
]:
review = r.copy()
review["product_id"] = product["id"]
review["title"] = product["title"]
review["desc"] = review["desc"].replace(
"${title}", product["title"]
)
reviews.append(review)
app.logger.debug("REVIEWS: %s", reviews)
# Transforming reviews into a customer template format to display in UI
response = generate_custom_template(reviews, template="review-template")
response["reviews"] = reviews
return flask.jsonify(response)
@app.post("/get_delivery_date")
def get_delivery_date():
"""
Get the estimated delivery date
"""
app.logger.debug("REACHED /GET_DELIVERY_DATE")
request_json = request.get_json(silent=False)
app.logger.debug("REQUEST: %s", request_json)
shopping_cart = request_json["shopping_cart"]
app.logger.debug("SHOPPING CART: %s", shopping_cart)
# set estimated delivery date between next 3 to 7 days
dt = datetime.now()
td = timedelta(days=random.randint(3, 7))
# your calculated date
est_delivery_date = dt + td
cart_with_delivery_dates = []
for item in shopping_cart:
new_item = item
new_item["delivery_date"] = datetime.strftime(
est_delivery_date, "%B %d, %Y"
)
new_item["earliest_delivery_date"] = datetime.strftime(
est_delivery_date, "%B %d, %Y"
)
cart_with_delivery_dates.append(new_item)
app.logger.debug("CART WITH DELIVERY DATES: %s", cart_with_delivery_dates)
return flask.jsonify({"shopping_cart": cart_with_delivery_dates})
@app.post("/store_delivery_date")
def store_delivery_date():
"""
Store the user's preferred delivery date (should be in future than the earliest estimated delivery date)
"""
app.logger.debug("REACHED /STORE_DELIVERY_DATE")
request_json = request.get_json(silent=False)
app.logger.debug("REQUEST: %s", request_json)
shopping_cart = request_json["shopping_cart"]
preferred_delivery_dates = request_json["preferred_delivery_date"]
app.logger.debug("SHOPPING CART: %s", shopping_cart)
app.logger.debug("PREFERRED DELIVERY DATES: %s", preferred_delivery_dates)
response = {
"status": "failure",
"reason": "There is an internal system problem with updating the delivery dates",
}
if len(preferred_delivery_dates) > 0:
for item in preferred_delivery_dates:
app.logger.debug("PREFERRED DELIVERY DATE ITEM: %s", item)
if item and item["id"] and item["preferred_delivery_date"]:
# copy product by value
product = (
list(filter(lambda x: x["id"] == item["id"], shopping_cart))
)[0]
product_index = shopping_cart.index(product)
# copy product by reference
product = shopping_cart[product_index]
# verify the preferred delivery date is in the future from the earliest delivery date
if datetime.strptime(
item["preferred_delivery_date"], "%B %d, %Y"
) >= datetime.strptime(
product["earliest_delivery_date"], "%B %d, %Y"
):
# updating product delivery date in the shopping cart
product["delivery_date"] = item["preferred_delivery_date"]
# modification in preferred_delivery_dates object
item["reason"] = (
"The delivery date for the "
+ product["title"]
+ " has been changed to "
+ product["delivery_date"]
)
item["status"] = "success"
else:
# modification in preferred_delivery_dates object
item["reason"] = (
"The preferred delivery date ("
+ item["preferred_delivery_date"]
+ ") is before the earliest delivery date ("
+ product["earliest_delivery_date"]
+ ") for the "
+ product["title"]
)
item["status"] = "fail"
app.logger.debug(
"DELIVERY DATES CHANGE STATUS: %s", preferred_delivery_dates
)
return flask.jsonify(
{
"shopping_cart": shopping_cart,
"preferred_delivery_date": preferred_delivery_dates,
}
)
@app.route("/place_order", methods=["POST"])
def place_order():
"""
Place the order for the shopping cart items
"""
app.logger.debug("REACHED /PLACE_ORDER")
request_json = request.get_json(silent=True)
app.logger.debug("REQUEST: %s", request_json)
products = request_json.get("products", [])
app.logger.debug("SHOPPING CART PRODUCTS: %s", products)
if not products:
app.logger.debug("EMPTY CART: %s", products)
return flask.jsonify(
{"order_status": "not_placed", "reason": "Shopping cart is empty!"}
)
order_id = uuid.uuid4().hex[:8]
order_created_on = datetime.utcnow().isoformat() + "Z"
# Eventually this response should be returned from an API after successful order placement
order_data = {
"order_id": order_id,
"order_status": "confirmed",
"order_created_on": order_created_on,
"products": products,
}
try:
# store the order data in firestore
with telemetry.tool_context_manager(_USER_AGENT):
db.collection("orders").document(order_id).set(order_data)
app.logger.debug("ORDER PLACED: %s", order_data)
return flask.jsonify(order_data)
except Exception as e:
app.logger.warning("PLACE ORDER EXCEPTION: %s", e)
return flask.jsonify(
{"order_status": "not_placed", "reason": "Internal Server Error!"}
)
@app.route("/get_user_info", methods=["GET"])
def get_user_info():
"""
Fetch user's personal info.
NOTE: currently using DUMMY user info. In an actual app, this should be coming from user's account.
"""
app.logger.debug("REACHED /USER_INFO")
request_json = request.get_json(silent=True)
app.logger.debug("REQUEST: %s", request_json)
return flask.jsonify(user_info)
@app.route("/update_user_info", methods=["POST"])
def update_user_info():
"""
Update user's info
NOTE: Currently updating in the DUMMY user info, but in an actual app, this should be updated in user's account.
"""
try:
# Parse the incoming data
incoming_data = request.get_json()
# Update the user_info object with new data when its provided
user_info["delivery_address"] = incoming_data.get(
"delivery_address", user_info["delivery_address"]
)
user_info["payment_info"] = encrypt_payment_info(
str(incoming_data.get("payment_info", user_info["payment_info"]))
)
user_info["contact_number"] = str(
incoming_data.get("contact_number", user_info["contact_number"])
)
user_info["email"] = incoming_data.get("email", user_info["email"])
# Respond with updated user information
return flask.jsonify(
{
"message": "User information updated successfully.",
"user_info": user_info,
}
)
except Exception as e:
app.logger.warning("USER INFO UPDATE EXCEPTION: %s", e)
return flask.jsonify(
{"message": "User information did not update successfully."}
)
def encrypt_payment_info(payment_info):
return "*" * 12 + payment_info[-4:]
@app.get("/no_op")
def no_op():
"""
Healthcheck endpoint
"""
app.logger.debug("REACHED /NO_OP")
return flask.jsonify(True)
def get_minimal_payload(resp_json):
"""
Returns neccessary fields from product's data
"""
results = []
for item in resp_json:
output = {"product": {}}
if "id" in item:
output["product"]["id"] = item["id"]
else:
output["product"]["id"] = item["product"]["id"]
output["product"]["title"] = item["product"]["title"]
output["product"]["name"] = item["product"]["name"]
output["product"]["priceInfo"] = item["product"]["priceInfo"]
output["product"]["images"] = [item["product"]["images"][0]]
if "description" in item["product"]:
output["product"]["description"] = item["product"]["description"]
else:
output["product"]["description"] = item["product"]["title"]
results.append(output)
return results
def generate_custom_template(payload, template="retail-template"):
"""
This works to return a custom template payload sucessfully in Playbook -> Tools -> Cloud function.
Parameters:
payload: The data being returned
template: The Custom template for DF Messenger to use to render the rich content.
Default is retail-template.
Returns:
response: The response object which contains custom template payload under "payload" field.
"""
return {
"payload": {
"richContent": [
[
{
"type": "custom_template",
"name": template,
"payload": {"items": payload},
}
]
]
}
}
@app.post("/search_filter")
def search_filter():
"""
returns products based on a search query from product catalog.
"""
app.logger.debug("REACHED /SEARCH")
request_json = request.get_json(silent=False)
app.logger.debug("REQUEST: %s", request_json)
# Capture the user's search query.
query = request_json['search']
# index number from which the products should be returned
start_index = request_json['offset']
search_filter = request_json['filter']
# Workaround that the conversation Agent Tool
# cannot parse '\"' in request body.
search_filter = search_filter.replace("^", "\"")
session_id = str(uuid.uuid4())
visitorid = session_id
# A search model name which was configured while creating product catalog
placement = 'default_search'
# Retail API search request
search_request = {
'placement':
'projects/' + PROJECT_NUMBER +
'/locations/global/catalogs/default_catalog/placements/' + placement,
'query': query,
'visitor_id': visitorid,
'query_expansion_spec': {
'condition': 'AUTO'
},
'filter': search_filter
}
try:
# Retail Search API call
with telemetry.tool_context_manager(_USER_AGENT):
response = client.search(search_request)
res = MessageToDict(response._pb)
# extract products based on the offset index from the returned products
if(start_index > len(res["results"])-1):
return flask.jsonify({"message": "No more products available to show"})
num_products = 3 # number of products to display in the UI
# b/405944679
# In our use case, top 3 products always been returned
# Add a random start index so different product are returned
if "offset" in request_json:
start_index = request_json['offset']
else:
start_index = 0
if len(res["results"]) <= num_products:
start_index = 0
elif start_index == 0:
start_index = random.randint(0, len(res["results"])- 1 - num_products)
end_index = start_index + num_products
end_index = end_index if len(res["results"]) > end_index else len(res["results"])
products = res["results"][start_index:end_index]
app.logger.debug("RAW RESULTS: %s", res["results"])
# remove unnecessary fields from product's data
data = get_minimal_payload(products)
app.logger.debug("RESULT: %s", data)
# Transform the product's data into a customer template format to
# display in the UI
response = generate_custom_template(data)
app.logger.debug("response: %s", response)
return flask.jsonify(response)
except Exception as e:
app.logger.warning("Retail Search Exception: %s", e)
return flask.jsonify({})
@app.post("/shopping_cart")
def show_shopping_cart():
app.logger.debug("REACHED /shopping_cart")
request_json = request.get_json(silent=False)
app.logger.debug("REQUEST: %s", request_json)
return shopping_cart(request_json)
@https_fn.on_request()
def main(req: https_fn.Request) -> https_fn.Response:
credentials.refresh(auth_req)
with app.request_context(req.environ):
return app.full_dispatch_request()