check-blogs/main.py (135 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import os from concurrent import futures from datetime import datetime import functions_framework import requests from blog_rss_urls import rss_urls from bs4 import BeautifulSoup from google import genai from google.cloud import firestore, pubsub_v1 from pytz import timezone client = genai.Client( vertexai=True, location="us-central1", ) firestore_client = firestore.Client(project=os.environ.get("GCP_PROJECT_ID")) batch_settings = pubsub_v1.types.BatchSettings( max_messages=100, # default 100 m max_bytes=1024, # default 1 MB max_latency=1, # default 10 ms ) publisher = pubsub_v1.PublisherClient(batch_settings) topic_path = publisher.topic_path( os.environ.get("GCP_PROJECT_ID"), os.environ.get("PUB_SUB_TOPIC_NAME") ) publish_futures = [] # Resolve the publish future in a separate thread. def callback(future: pubsub_v1.publisher.futures.Future) -> None: message_id = future.result() print(message_id) def summarize_blog(blog): try: prompt = f""" You are a helpful assistant that concisely summarizes Google Cloud blog posts. You will be provided with a blog post link. You will concisely summarize the blog post using bulleted lists if necessary. You will not include the blog title in the summary. You will leave out introductory text like 'This blog contains...' or 'Here is the summary...'. You will use the following Google Chat API text formatting options if necessary: {{ "formatting_options": {{ "Bold": {{ "example": "This is <b>bold</b>." }}, "Italics": {{ "example": "This is <i>italics</i>." }}, "Underline": {{ "example": "This is <u>underline</u>." }}, "Strikethrough": {{ "example": "This is <s>strikethrough</s>." }}, "Font color": {{ "example": "This is <font color=\"#FF0000\">red font</font>." }}, "Hyperlink": {{ "example": "This is a <a href=\"https://www.google.com\">hyperlink</a>." }}, "Time": {{ "example": "This is a time format: <time>2023-02-16 15:00</time>." }}, "Newline": {{ "example": "This is the first line. <br> This is a new line." }}, "Bulleted List": {{ "example": "<span>&nbsp;&nbsp;&nbsp;&nbsp;&#8226;&nbsp;&nbsp;</span>This is the first item in the list.<br><span>&nbsp;&nbsp;&nbsp;&nbsp;&#8226;&nbsp;&nbsp;</span>This is the second item in the list.<br><span>&nbsp;&nbsp;&nbsp;&nbsp;&#8226;&nbsp;&nbsp;</span>This is the third item in the list." }}, }} }} You will not mention anything about the formatting_options in the summary. Here is the blog post link to summarize: {blog.get("link")} """ response = client.models.generate_content( # https://ai.google.dev/gemini-api/docs/models#gemini-2.5-pro-preview-03-25 model="gemini-2.5-pro-preview-03-25", contents=prompt, ) if response.text: # Check if there's a valid response blog["summary"] = response.text return response.text else: print(f"Gemini returned an empty response for: {blog.get('link')}") return None except Exception as e: print(f"Error summarizing blog {blog.get('link')}: {e}") return None def get_blog_posts(rss_url): page = requests.get(rss_url) soup = BeautifulSoup(page.content, "xml") blog_map = {} blogs = soup.find_all("item") category = soup.find("title").contents[0] for blog in blogs: try: guid = blog.find("guid").contents[0] title = blog.find("title").contents[0] link = blog.find("link").contents[0] description = blog.find("description").contents[0] pub_date = blog.find("pubDate").contents[0] pub_date = ( datetime.strptime(pub_date, "%a, %d %b %Y %H:%M:%S +0000") .astimezone(timezone("US/Eastern")) .replace(second=0, minute=0, hour=0, microsecond=0) .date() ) today_date = ( datetime.now() .astimezone(timezone("US/Eastern")) .replace(second=0, minute=0, hour=0, microsecond=0) .date() ) is_updated_today = pub_date == today_date if is_updated_today: blog_map[guid] = { "category_name": category, "title": title, "link": link, "date": pub_date.strftime("%B %d, %Y"), } except AttributeError as e: continue return blog_map def get_stored_blog_posts(): doc_ref = firestore_client.collection("cloud-release-notes").document("blogs") blog_map = doc_ref.get().to_dict() return blog_map def get_new_blog_posts(blog_map=None): if blog_map is None: blog_map = get_blog_posts() stored_blog_map = get_stored_blog_posts() or {} new_blogs_map = {} for blog in blog_map.keys(): if blog not in stored_blog_map.keys(): new_blogs_map[blog] = blog_map[blog] return new_blogs_map def publish_to_pubsub(space_id, blog): """Publishes a message to Pub/Sub with space ID and HTML content.""" message_json = json.dumps( { "space_id": space_id, "blog": blog, } ).encode("utf-8") future = publisher.publish(topic_path, message_json) future.add_done_callback(callback) publish_futures.append(future) print(f"Published message ID: {future.result()}") def send_new_blogs(): blog_map = {} with futures.ThreadPoolExecutor() as executor: blogs_by_categories = executor.map(get_blog_posts, rss_urls) for category_blogs in blogs_by_categories: for guid, blog in category_blogs.items(): if blog: blog_map[guid] = blog new_blogs_map = get_new_blog_posts(blog_map) with futures.ThreadPoolExecutor() as executor: executor.map(summarize_blog, new_blogs_map.values()) subscriptions_ref = firestore_client.collection("space_blog_subscriptions") for blog in new_blogs_map.values(): print(f"New blog found: {blog['link']} in {blog['category_name']}") product_doc = subscriptions_ref.document(blog["category_name"]).get() if product_doc.exists: spaces_subscribed = product_doc.to_dict().get("spaces_subscribed", []) if blog.get("summary"): for space_id in spaces_subscribed: publish_to_pubsub(space_id, blog) else: print(f"Failed to generate summary for: {blog['link']}") futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) if new_blogs_map: # Keep this part to update the Firestore document doc_ref = firestore_client.collection("cloud-release-notes").document("blogs") doc_ref.set(blog_map) @functions_framework.http def http_request(request): """HTTP Cloud Function. Args: request (flask.Request): The request object. <https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data> Returns: The response text, or any set of values that can be turned into a Response object using `make_response` <https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>. """ send_new_blogs() return ("Done", 200)