test-infra/cleanup_pubsub_notifications.py (34 lines of code) (raw):

import datetime from google.cloud import storage # Delete notifications older than or equal to these manty days delete_threshold_in_days = 2 def delete_old_pubsub_notifications(): storage_client = storage.Client() buckets = [f"cloud-teleport-spanner-it-{i}" for i in range(10)] buckets += ["cloud-teleport-testing-it-gitactions", "cloud-teleport-testing-it", "cloud-teleport-testing"] today = datetime.datetime.now(datetime.UTC).date() # Get today's date in UTC for bucket_name in buckets: print("Cleaning up notifications for bucket", bucket_name) bucket = storage_client.bucket(bucket_name) notifications = list(bucket.list_notifications()) if not notifications: print(f"Bucket '{bucket_name}' has no notifications.") continue for notification in notifications: # Extract object_name_prefix object_name_prefix = notification._properties.get("object_name_prefix", "") # Try to extract the date from the prefix (format: YYYYMMDD) date_str = extract_date_from_prefix(object_name_prefix) if date_str: notification_date = datetime.datetime.strptime(date_str, "%Y%m%d").date() if today - notification_date >= datetime.timedelta(days=delete_threshold_in_days): print(f"Deleting old notification from {bucket_name}, created on {notification_date}") notification.delete() else: print(f"Skipping notification created today ({notification_date})") else: print(f"Skipping notification - No valid date found in prefix") return "Old Pub/Sub notifications deleted", 200 def extract_date_from_prefix(prefix): """Extracts YYYYMMDD date from object_name_prefix, if available.""" import re match = re.search(r'(\d{8})', prefix) # Look for a YYYYMMDD pattern return match.group(1) if match else None if __name__ == "__main__": delete_old_pubsub_notifications()