packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/app/bot_remove.py (72 lines of code) (raw):
import json
import os
from typing import Any
import boto3
import pg8000
from app.repositories.apigateway import delete_api_key, find_usage_plan_by_id
from app.repositories.cloudformation import delete_stack_by_bot_id, find_stack_by_bot_id
from app.repositories.common import RecordNotFoundError, decompose_bot_id
from aws_lambda_powertools.utilities import parameters
DB_SECRETS_ARN = os.environ.get("DB_SECRETS_ARN", "")
DOCUMENT_BUCKET = os.environ.get("DOCUMENT_BUCKET", "documents")
s3_client = boto3.client("s3")
def delete_from_postgres(bot_id: str):
"""Delete data related to `bot_id` from vector store (i.e. PostgreSQL)."""
secrets: Any = parameters.get_secret(DB_SECRETS_ARN) # type: ignore
db_info = json.loads(secrets)
conn = pg8000.connect(
database=db_info["dbname"],
host=db_info["host"],
port=db_info["port"],
user=db_info["username"],
password=db_info["password"],
)
try:
with conn.cursor() as cursor:
delete_query = "DELETE FROM items WHERE botid = %s"
cursor.execute(delete_query, (bot_id,))
conn.commit()
print(f"Successfully deleted records for bot_id: {bot_id}")
except Exception as e:
conn.rollback()
print(f"Error deleting records for bot_id: {bot_id}")
print(e)
finally:
conn.close()
def delete_from_s3(user_id: str, bot_id: str):
"""Delete all files in S3 bucket for the specified `user_id` and `bot_id`."""
prefix = f"{user_id}/{bot_id}/"
try:
# List all objects with the specific prefix
objects_to_delete = s3_client.list_objects_v2(
Bucket=DOCUMENT_BUCKET, Prefix=prefix
)
if "Contents" in objects_to_delete:
# Prepare the list of objects to delete
delete_keys = [{"Key": obj["Key"]} for obj in objects_to_delete["Contents"]]
# Delete the objects
s3_client.delete_objects(
Bucket=DOCUMENT_BUCKET, Delete={"Objects": delete_keys}
)
print(f"Successfully deleted files from S3 for bot_id: {bot_id}")
else:
print("No files found to delete in S3.")
except Exception as e:
print(f"Error deleting files for bot_id: {bot_id}")
print(e)
def handler(event, context):
"""Bot removal handler.
This function is triggered by dynamodb stream when item is deleted.
Following resources are deleted asynchronously when bot is deleted:
- vector store record (postgres)
- s3 files
- cloudformation stack (if exists)
"""
print(f"Received event: {event}")
# NOTE: batch size is 1
record = event["Records"][0]
pk = record["dynamodb"]["Keys"]["PK"]["S"]
sk = record["dynamodb"]["Keys"].get("SK", {}).get("S")
if not sk or "#BOT#" not in sk:
# Ignore non-bot items
print(f"Skipping event for SK: {sk}")
return
user_id = pk
bot_id = decompose_bot_id(sk)
delete_from_postgres(bot_id)
delete_from_s3(user_id, bot_id)
# Check if cloudformation stack exists
try:
stack = find_stack_by_bot_id(bot_id)
except RecordNotFoundError:
print(f"Bot {bot_id} cloudformation stack not found. Skipping deletion.")
return
# Before delete cfn stack, delete all api keys
usage_plan = find_usage_plan_by_id(stack.api_usage_plan_id)
for key_id in usage_plan.key_ids:
delete_api_key(key_id)
# Delete `ApiPublishmentStack` by CloudFormation
delete_stack_by_bot_id(bot_id)