chat-client/main.py (496 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import os
import re
from concurrent import futures
from typing import Any, Mapping
import client_utils
import flask
import functions_framework
from google.cloud import firestore
SUBSCRIBE_COMMAND_ID = 1
SUBSCRIPTIONS_COMMAND_ID = 2
DB = firestore.Client(os.environ.get("GCP_PROJECT_ID"))
@functions_framework.http
def chat_app(req: flask.Request) -> Mapping[str, Any]:
req_json = req.get_json()
print(f"Received request: {req_json}")
# Handle chat UI
if req.method == "POST" and req.path == "/":
chatEvent = req_json["chat"]
if "messagePayload" in chatEvent:
return handleMessage(req_json)
# Handle app commands
elif "appCommandPayload" in chatEvent:
appCommandMetadata = chatEvent["appCommandPayload"]["appCommandMetadata"]
if appCommandMetadata["appCommandType"] == "SLASH_COMMAND":
if appCommandMetadata["appCommandId"] == SUBSCRIBE_COMMAND_ID:
return openInitialDialog(req_json)
elif appCommandMetadata["appCommandId"] == SUBSCRIPTIONS_COMMAND_ID:
return returnSubscriptions(req_json)
# Handle added to space
elif "addedToSpacePayload" in chatEvent:
return handleMessage(req_json)
# Handle app removal from space
elif "removedFromSpacePayload" in chatEvent:
print("Unsubscribing from space")
space_id = req_json["chat"]["removedFromSpacePayload"]["space"]["name"]
subscriptions_ref = DB.collection("product_space_subscriptions")
product_doc_ref = subscriptions_ref.document(space_id.replace("/", "_"))
products_doc = product_doc_ref.get()
if products_doc.exists:
products = products_doc.to_dict().get("products_subscribed", [])
categories = products_doc.to_dict().get("categories_subscribed", [])
with futures.ThreadPoolExecutor() as executor:
unsubscribe_space_product_futures = [
executor.submit(
unsubscribe_space_product,
space_id,
DB.collection("space_product_subscriptions"),
product,
)
for product in products
]
unsubscribe_space_blogs_futures = [
executor.submit(
unsubscribe_space_blogs,
space_id,
DB.collection("space_blog_subscriptions"),
category,
)
for category in categories
]
futures.wait(
unsubscribe_space_product_futures + unsubscribe_space_blogs_futures
)
product_doc_ref.delete()
print(
f"Unsubscribed space {space_id} from all products and categories."
)
return ("Done", 200)
# Handle button clicks
elif "buttonClickedPayload" in chatEvent:
if (
req_json["commonEventObject"]["parameters"]["actionName"]
== "openInitialDialog"
):
return openInitialDialog(req_json)
elif (
req_json["commonEventObject"]["parameters"]["actionName"]
== "submitDialog"
):
print(f"Submitting dialog: {submitDialog(req_json)}")
return submitDialog(req_json)
# Handle Pub/Sub push messages
elif req.method == "POST" and req.path == "/messages":
return handle_pubsub_message(req)
print("Reached an unexpected state.")
def handleMessage(event):
return {
"hostAppDataAction": {
"chatDataAction": {
"createMessageAction": {
"message": {
"text": "To add a subscription in this space, use the `/subscribe` command!",
}
}
}
}
}
CATEGORY_MAP = {
"All Data Products": client_utils.google_cloud_data_products,
"All AI Products": client_utils.google_cloud_ai_products,
"All App Mod Products": client_utils.google_cloud_app_mod_products,
"All Security Products": client_utils.google_cloud_security_products,
}
BLOG_CATEGORY_MAP = {
"All Data Blogs": getattr(
client_utils, "data_categories", []
), # Use getattr for safety
# "All AI Blogs": client_utils.ai_blogs_categories, # Example
}
def _get_expanded_subscription_set(subscribed_items, category_map):
initial_set = set(subscribed_items)
expanded_set = set(initial_set) # Start with explicit items
for category_tag, category_list in category_map.items():
if category_tag in initial_set:
# If the user subscribed to a category tag, add all items from that category
expanded_set.update(category_list)
return expanded_set
# Helper function to get members excluding the tag itself
def get_members_only(category_tag, category_map):
"""Gets items in a category, excluding the category tag itself."""
full_list = category_map.get(category_tag, [])
# Using str() ensures comparison works if lists somehow contain non-strings
# This filters out the tag itself from the list of members.
return {str(item) for item in full_list if str(item) != str(category_tag)}
def openInitialDialog(request_json):
"""
Opens the initial subscription dialog. If a category tag (e.g., "All App Mod Products")
is present in the saved subscriptions (which also includes individual members),
only the category tag itself is marked selected in the UI, suppressing the selection
of the individual members of that category (even if they are in the saved list).
Other explicitly saved items remain selected.
Handles overrides ("All Products", "All Blogs").
"""
try:
# Default empty sets
products_subscribed_set = set()
categories_subscribed_set = set()
doc_exists = False
# Fetch subscriptions
space_name = request_json["chat"]["appCommandPayload"]["space"]["name"].replace(
"/", "_"
)
subscriptions_ref = DB.collection("product_space_subscriptions")
product_doc_ref = subscriptions_ref.document(space_name)
products_doc = product_doc_ref.get()
doc_exists = products_doc.exists
if doc_exists:
doc_data = products_doc.to_dict()
products_subscribed_set = set(doc_data.get("products_subscribed", []))
categories_subscribed_set = set(doc_data.get("categories_subscribed", []))
# --- Determine Overrides ---
all_products_override = "All Products" in products_subscribed_set
all_blogs_override = "All Blogs" in categories_subscribed_set
# --- Generate Product Dialog Items ---
notes = []
all_possible_products = getattr(client_utils, "google_cloud_products", [])
if all_products_override:
# Handle global override - only "All Products" is selected
for product in all_possible_products:
is_selected = product == "All Products"
notes.append(
{"text": product, "value": product, "selected": is_selected}
)
elif doc_exists:
# Identify active category tags and the individual products they cover
active_product_tags = {
tag for tag in CATEGORY_MAP if tag in products_subscribed_set
}
products_covered_by_active_tags = set()
if active_product_tags:
for tag in active_product_tags:
# Get members ONLY (exclude the tag)
products_covered_by_active_tags.update(
get_members_only(tag, CATEGORY_MAP)
)
# Determine selection state for each possible product
for product in all_possible_products:
is_selected = False # Default
# Rule 1: Is it an active category tag?
if product in active_product_tags:
is_selected = True
# Rule 2: Is it in the subscribed list AND NOT covered by an active tag?
elif (
product in products_subscribed_set
and product not in products_covered_by_active_tags
):
is_selected = True
notes.append(
{"text": product, "value": product, "selected": is_selected}
)
else:
# No document exists and no override, nothing is selected
for product in all_possible_products:
notes.append({"text": product, "value": product, "selected": False})
# --- Generate Blog Dialog Items (Similar Logic) ---
blogs = []
all_possible_categories = getattr(client_utils, "categories", [])
if all_blogs_override:
# Handle global override - only "All Blogs" is selected
for category in all_possible_categories:
is_selected = category == "All Blogs"
blogs.append(
{"text": category, "value": category, "selected": is_selected}
)
elif doc_exists:
# Identify active category tags and the individual blogs they cover
active_blog_tags = {
tag for tag in BLOG_CATEGORY_MAP if tag in categories_subscribed_set
}
blogs_covered_by_active_tags = set()
if active_blog_tags:
for tag in active_blog_tags:
# Get members ONLY (exclude the tag)
blogs_covered_by_active_tags.update(
get_members_only(tag, BLOG_CATEGORY_MAP)
)
# Determine selection state for each possible category
for category in all_possible_categories:
is_selected = False # Default
# Rule 1: Is it an active category tag?
if category in active_blog_tags:
is_selected = True
# Rule 2: Is it in the subscribed list AND NOT covered by an active tag?
elif (
category in categories_subscribed_set
and category not in blogs_covered_by_active_tags
):
is_selected = True
blogs.append(
{"text": category, "value": category, "selected": is_selected}
)
else:
# No document exists and no override, nothing is selected
for category in all_possible_categories:
blogs.append({"text": category, "value": category, "selected": False})
return client_utils.retrieve_dialog_response(notes, blogs)
except Exception as e:
# Add proper error handling/logging
space_name_for_error = "unknown"
try:
# Attempt to get space name for logging, but don't fail if request structure is unexpected
space_name_for_error = (
request_json.get("chat", {})
.get("appCommandPayload", {})
.get("space", {})
.get("name", "unknown")
.replace("/", "_")
)
except Exception:
pass # Ignore errors just trying to get the name for logging
print(f"Error opening initial dialog for space {space_name_for_error}: {e}")
# Return an error response or a default dialog
notes = [
{"text": p, "value": p, "selected": False}
for p in getattr(client_utils, "google_cloud_products", [])
]
blogs = [
{"text": c, "value": c, "selected": False}
for c in getattr(client_utils, "categories", [])
]
return client_utils.retrieve_dialog_response(
notes, blogs, error="Failed to load subscriptions."
)
def returnSubscriptions(request_json):
subscriptions_ref = DB.collection("product_space_subscriptions")
product_doc_ref = subscriptions_ref.document(
request_json["chat"]["appCommandPayload"]["space"]["name"].replace("/", "_")
)
products_doc = product_doc_ref.get()
notes = []
blogs = []
if products_doc.exists:
products = products_doc.to_dict().get("products_subscribed", [])
categories = products_doc.to_dict().get("categories_subscribed", [])
product_list = (
"\n".join(f"- {product}" for product in products) if products else "None"
)
category_list = (
"\n".join(f"- {category}" for category in categories)
if categories
else "None"
)
message_text = f"Current Subscriptions for this Space:\n\nProducts:\n{product_list}\n\nBlog categories:\n{category_list}"
return {
"hostAppDataAction": {
"chatDataAction": {
"createMessageAction": {
"message": {
"text": message_text,
}
}
}
}
}
else:
return {
"hostAppDataAction": {
"chatDataAction": {
"createMessageAction": {
"message": {
"text": "There are no subscriptions for this space yet. Use `/subscribe` to add some!",
}
}
}
}
}
def handle_templatized_notes_inputs(products):
initial_products_set = set(products)
all_products_flag = "All Products" in initial_products_set
if all_products_flag:
final_products_set = set(client_utils.google_cloud_products)
return sorted(list(final_products_set)), True
final_products_set = set(initial_products_set)
for category_tag, category_product_list in CATEGORY_MAP.items():
if category_tag in initial_products_set:
final_products_set.update(category_product_list)
return sorted(list(final_products_set)), False
def handle_templatized_blogs_inputs(categories):
all_blogs = "All Blogs" in categories
all_data_blogs = "All Data Blogs" in categories
if all_data_blogs and not all_blogs:
categories.extend(client_utils.data_categories)
if all_blogs:
categories = client_utils.categories
categories = list(set(categories))
return categories, all_blogs
def submitDialog(event):
chatUser = event["chat"]["user"]
products = []
categories = []
all_products = False
all_blogs = False
space_id = event["chat"]["buttonClickedPayload"]["space"]["name"]
if "formInputs" in event["commonEventObject"]:
if "contactType" in event["commonEventObject"]["formInputs"]:
products = event["commonEventObject"]["formInputs"]["contactType"][
"stringInputs"
]["value"]
products, all_products = handle_templatized_notes_inputs(products)
if "blogType" in event["commonEventObject"]["formInputs"]:
categories = event["commonEventObject"]["formInputs"]["blogType"][
"stringInputs"
]["value"]
categories, all_blogs = handle_templatized_blogs_inputs(categories)
with futures.ThreadPoolExecutor() as executor:
record_space_subscription_futures = [
executor.submit(record_space_subscription, space_id, product)
for product in products
]
record_space_blogs_futures = [
executor.submit(record_space_blogs, space_id, category)
for category in categories
]
futures.wait(record_space_subscription_futures + record_space_blogs_futures)
record_product_subscription(space_id, products, categories)
response = ""
if products: # More concise way to check if list is not empty
product_message = f"products: {', '.join(products)}"
else:
product_message = "no products" # Or a more appropriate message
if all_products:
product_message = "All Products"
if categories:
category_message = f"and categories: {', '.join(categories)}"
else:
category_message = "and no categories"
if all_blogs:
category_message = "and All Categories"
if products or categories: # Check if either products or categories are selected
response = f"😄🎉 Your request has been successfully submitted!\n\nThis space is now subscribed to {product_message} {category_message}."
else:
response = "😄🎉 Your request has been successfully submitted!\n\nThis space is now unsubscribed from any products or categories."
return {
"hostAppDataAction": {
"chatDataAction": {
"createMessageAction": {
"message": {"privateMessageViewer": chatUser, "text": response}
}
}
}
}
def record_space_blogs(space_id, category):
try:
subscriptions_ref = DB.collection("space_blog_subscriptions")
category_doc_ref = subscriptions_ref.document(category)
category_doc = category_doc_ref.get()
if category_doc.exists:
spaces_subscribed = category_doc.to_dict().get("spaces_subscribed", [])
if space_id not in spaces_subscribed:
spaces_subscribed.append(space_id)
category_doc_ref.update({"spaces_subscribed": spaces_subscribed})
else:
category_doc_ref.set(
{"category": category, "spaces_subscribed": [space_id]}
)
except Exception as e:
print(f"Error recording subscription: {e}", exc_info=True)
def record_space_subscription(space_id, product):
try:
subscriptions_ref = DB.collection("space_product_subscriptions")
product_doc_ref = subscriptions_ref.document(product.replace("/", ""))
product_doc = product_doc_ref.get()
if product_doc.exists:
spaces_subscribed = product_doc.to_dict().get("spaces_subscribed", [])
if space_id not in spaces_subscribed:
spaces_subscribed.append(space_id)
product_doc_ref.update({"spaces_subscribed": spaces_subscribed})
else:
product_doc_ref.set({"product": product, "spaces_subscribed": [space_id]})
except Exception as e:
print(f"Error recording subscription: {e}", exc_info=True)
def unsubscribe_space_blogs(space_id, space_blog_subscriptions_ref, category):
print(f"Unsubscribing space {space_id} from category {category}")
categories_doc_ref = space_blog_subscriptions_ref.document(category)
categories_doc_ref.update({"spaces_subscribed": firestore.ArrayRemove([space_id])})
def unsubscribe_space_product(space_id, space_product_subscriptions_ref, product):
print(f"Unsubscribing space {space_id} from product {product}")
product_doc_ref = space_product_subscriptions_ref.document(product.replace("/", ""))
product_doc_ref.update({"spaces_subscribed": firestore.ArrayRemove([space_id])})
def record_product_subscription(space_id, products, categories):
try:
subscriptions_ref = DB.collection("product_space_subscriptions")
space_doc_ref = subscriptions_ref.document(space_id.replace("/", "_"))
if space_doc_ref.get().exists:
previous_products = (
space_doc_ref.get().to_dict().get("products_subscribed", [])
)
previous_categories = (
space_doc_ref.get().to_dict().get("categories_subscribed", [])
)
if (len(previous_products) > len(products)) or (
len(previous_categories) > len(categories)
):
unsubscribed_products = list(set(previous_products) - set(products))
unsubscribed_categories = list(
set(previous_categories) - set(categories)
)
with futures.ThreadPoolExecutor() as executor:
unsubscribe_space_product_futures = [
executor.submit(
unsubscribe_space_product,
space_id,
DB.collection("space_product_subscriptions"),
product,
)
for product in unsubscribed_products
]
unsubscribe_space_blogs_futures = [
executor.submit(
unsubscribe_space_blogs,
space_id,
DB.collection("space_blog_subscriptions"),
category,
)
for category in unsubscribed_categories
]
futures.wait(
unsubscribe_space_product_futures + unsubscribe_space_blogs_futures
)
space_doc_ref.set(
{"products_subscribed": products, "categories_subscribed": categories}
)
except Exception as e:
print(f"Error recording subscription: {e}", exc_info=True)
def create_card_message(pubsub_message):
if "release_note" in pubsub_message:
release_note = pubsub_message.get("release_note")
title = f"New Release from {release_note.get('product')}"
subtitle = release_note.get("date")
# Replace html header <h3> with <b> formatting that Chat API supports
# https://developers.google.com/apps-script/add-ons/concepts/widgets#text_formatting
card_msg = re.sub(
r"<h3>(?P<header>.*?)</h3>", r"<b>\g<header></b>", release_note.get("html")
)
link = release_note.get("link")
elif "blog" in pubsub_message:
blog = pubsub_message.get("blog")
title = f"New Blog from {blog.get('category_name')}"
subtitle = blog.get("date")
card_msg = f"<b>{blog.get('title')}</b><br><br>{blog.get('summary')}"
link = blog.get("link")
else:
title = "An Error Occurred"
subtitle = ""
card_msg = f"An unexpected error occurred."
link = ""
return {
"thread": {"threadKey": f"{link}"},
"cardsV2": [
{
"card": {
"header": {
"title": f"{title}",
"subtitle": subtitle,
},
"sections": [
{
"widgets": [
{
"textParagraph": {
"text": card_msg,
}
},
{"divider": {}},
{
"decoratedText": {
"text": f"<a href='{link}'>Read more</a>",
"startIcon": {
"materialIcon": {
"name": "link",
}
},
}
},
],
}
],
}
}
],
}
def handle_pubsub_message(req: flask.Request):
try:
envelope = req.get_json()
if not envelope:
raise Exception("No Pub/Sub message received")
pubsub_message = json.loads(
base64.b64decode(envelope["message"]["data"]).decode("utf-8").strip()
)
print(f"Processing Pub/Sub message: {pubsub_message}")
space_id = pubsub_message.get("space_id")
card_message = create_card_message(pubsub_message)
print(f"Sending the following card to space {space_id}:\n\n{card_message}")
client_utils.send_chat_message(space_id, card_message)
return ("Done", 200)
except Exception as e:
# Return a 200 to acknowledge receipt of the message
# otherwise Pub/Sub will continue to trigger this function
print(f"Error handling Pub/Sub message: {e}")
return ("Done", 200)