in packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/app/repositories/conversation.py [0:0]
def delete_conversation_by_user_id(user_id: str):
logger.info(f"Deleting ALL conversations for user: {user_id}")
table = _get_table_client(user_id)
query_params = {
"KeyConditionExpression": Key("PK").eq(user_id)
# NOTE: Need SK to fetch only conversations
& Key("SK").begins_with(f"{user_id}#CONV#"),
"ProjectionExpression": "SK, IsLargeMessage, LargeMessagePath",
}
def delete_batch(batch):
with table.batch_writer() as writer:
for item in batch:
writer.delete_item(Key={"PK": user_id, "SK": item["SK"]})
def delete_large_messages(items):
for item in items:
if item.get("IsLargeMessage", False):
s3_client.delete_object(
Bucket=LARGE_MESSAGE_BUCKET, Key=item["LargeMessagePath"]
)
try:
response = table.query(
**query_params,
)
while True:
items = response.get("Items", [])
delete_large_messages(items)
for i in range(0, len(items), TRANSACTION_BATCH_SIZE):
batch = items[i : i + TRANSACTION_BATCH_SIZE]
delete_batch(batch)
# Check if next page exists
if "LastEvaluatedKey" not in response:
break
# Load next page
query_params["ExclusiveStartKey"] = response["LastEvaluatedKey"]
response = table.query(
**query_params,
)
except ClientError as e:
logger.error(f"An error occurred: {e.response['Error']['Message']}")