packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/app/usecases/bot.py (532 lines of code) (raw):
import logging
import os
from app.agents.utils import get_available_tools, get_tool_by_name
from app.config import DEFAULT_EMBEDDING_CONFIG
from app.config import DEFAULT_GENERATION_CONFIG as DEFAULT_CLAUDE_GENERATION_CONFIG
from app.config import DEFAULT_MISTRAL_GENERATION_CONFIG, DEFAULT_SEARCH_CONFIG
from app.repositories.common import (
RecordNotFoundError,
_get_table_client,
decompose_bot_alias_id,
decompose_bot_id,
)
from app.repositories.custom_bot import (
delete_alias_by_id,
delete_bot_by_id,
find_alias_by_id,
find_private_bot_by_id,
find_public_bot_by_id,
store_alias,
store_bot,
update_alias_last_used_time,
update_alias_pin_status,
update_bot,
update_bot_last_used_time,
update_bot_pin_status,
)
from app.repositories.models.custom_bot import (
AgentModel,
AgentToolModel,
BotAliasModel,
BotMeta,
BotModel,
EmbeddingParamsModel,
GenerationParamsModel,
KnowledgeModel,
SearchParamsModel,
)
from app.routes.schemas.bot import (
Agent,
AgentTool,
BotInput,
BotModifyInput,
BotModifyOutput,
BotOutput,
BotSummaryOutput,
EmbeddingParams,
GenerationParams,
Knowledge,
SearchParams,
type_sync_status,
)
from app.utils import (
compose_upload_document_s3_path,
compose_upload_temp_s3_path,
compose_upload_temp_s3_prefix,
delete_file_from_s3,
delete_files_with_prefix_from_s3,
generate_presigned_url,
get_current_time,
move_file_in_s3,
)
from boto3.dynamodb.conditions import Attr, Key
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
DOCUMENT_BUCKET = os.environ.get("DOCUMENT_BUCKET", "bedrock-documents")
ENABLE_MISTRAL = os.environ.get("ENABLE_MISTRAL", "") == "true"
DEFAULT_GENERATION_CONFIG = (
DEFAULT_MISTRAL_GENERATION_CONFIG
if ENABLE_MISTRAL
else DEFAULT_CLAUDE_GENERATION_CONFIG
)
def _update_s3_documents_by_diff(
user_id: str,
bot_id: str,
added_filenames: list[str],
deleted_filenames: list[str],
):
for filename in added_filenames:
tmp_path = compose_upload_temp_s3_path(user_id, bot_id, filename)
document_path = compose_upload_document_s3_path(user_id, bot_id, filename)
move_file_in_s3(DOCUMENT_BUCKET, tmp_path, document_path)
for filename in deleted_filenames:
document_path = compose_upload_document_s3_path(user_id, bot_id, filename)
delete_file_from_s3(DOCUMENT_BUCKET, document_path)
def create_new_bot(user_id: str, bot_input: BotInput) -> BotOutput:
"""Create a new bot.
Bot is created as private and not pinned.
"""
current_time = get_current_time()
has_knowledge = bot_input.knowledge and (
len(bot_input.knowledge.source_urls) > 0
or len(bot_input.knowledge.sitemap_urls) > 0
or len(bot_input.knowledge.filenames) > 0
)
sync_status: type_sync_status = "QUEUED" if has_knowledge else "SUCCEEDED"
source_urls = []
sitemap_urls = []
filenames = []
if bot_input.knowledge:
source_urls = bot_input.knowledge.source_urls
sitemap_urls = bot_input.knowledge.sitemap_urls
# Commit changes to S3
_update_s3_documents_by_diff(
user_id, bot_input.id, bot_input.knowledge.filenames, []
)
# Delete files from upload temp directory
delete_files_with_prefix_from_s3(
DOCUMENT_BUCKET, compose_upload_temp_s3_prefix(user_id, bot_input.id)
)
filenames = bot_input.knowledge.filenames
chunk_size = (
bot_input.embedding_params.chunk_size
if bot_input.embedding_params
else DEFAULT_EMBEDDING_CONFIG["chunk_size"]
)
chunk_overlap = (
bot_input.embedding_params.chunk_overlap
if bot_input.embedding_params
else DEFAULT_EMBEDDING_CONFIG["chunk_overlap"]
)
enable_partition_pdf = (
bot_input.embedding_params.enable_partition_pdf
if bot_input.embedding_params
else DEFAULT_EMBEDDING_CONFIG["enable_partition_pdf"]
)
generation_params = (
bot_input.generation_params.model_dump()
if bot_input.generation_params
else DEFAULT_GENERATION_CONFIG
)
search_params = (
bot_input.search_params.model_dump()
if bot_input.search_params
else DEFAULT_SEARCH_CONFIG
)
agent = (
AgentModel(
tools=[
AgentToolModel(name=t.name, description=t.description)
for t in [
get_tool_by_name(tool_name) for tool_name in bot_input.agent.tools
]
]
)
if bot_input.agent
else AgentModel(tools=[])
)
store_bot(
user_id,
BotModel(
id=bot_input.id,
title=bot_input.title,
description=bot_input.description if bot_input.description else "",
instruction=bot_input.instruction,
create_time=current_time,
last_used_time=current_time,
public_bot_id=None,
is_pinned=False,
owner_user_id=user_id, # Owner is the creator
embedding_params=EmbeddingParamsModel(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
enable_partition_pdf=enable_partition_pdf,
),
generation_params=GenerationParamsModel(**generation_params), # type: ignore
search_params=SearchParamsModel(**search_params),
agent=agent,
knowledge=KnowledgeModel(
source_urls=source_urls, sitemap_urls=sitemap_urls, filenames=filenames
),
sync_status=sync_status,
sync_status_reason="",
sync_last_exec_id="",
published_api_stack_name=None,
published_api_datetime=None,
published_api_codebuild_id=None,
display_retrieved_chunks=bot_input.display_retrieved_chunks,
),
)
return BotOutput(
id=bot_input.id,
title=bot_input.title,
instruction=bot_input.instruction,
description=bot_input.description if bot_input.description else "",
create_time=current_time,
last_used_time=current_time,
is_public=False,
is_pinned=False,
owned=True,
embedding_params=EmbeddingParams(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
enable_partition_pdf=enable_partition_pdf,
),
generation_params=GenerationParams(**generation_params),
search_params=SearchParams(**search_params),
agent=Agent(
tools=[
AgentTool(name=tool.name, description=tool.description)
for tool in agent.tools
]
),
knowledge=Knowledge(
source_urls=source_urls, sitemap_urls=sitemap_urls, filenames=filenames
),
sync_status=sync_status,
sync_status_reason="",
sync_last_exec_id="",
display_retrieved_chunks=bot_input.display_retrieved_chunks,
)
def modify_owned_bot(
user_id: str, bot_id: str, modify_input: BotModifyInput
) -> BotModifyOutput:
"""Modify owned bot."""
source_urls = []
sitemap_urls = []
filenames = []
sync_status: type_sync_status = "QUEUED"
if modify_input.knowledge:
source_urls = modify_input.knowledge.source_urls
sitemap_urls = modify_input.knowledge.sitemap_urls
# Commit changes to S3
_update_s3_documents_by_diff(
user_id,
bot_id,
modify_input.knowledge.added_filenames,
modify_input.knowledge.deleted_filenames,
)
# Delete files from upload temp directory
delete_files_with_prefix_from_s3(
DOCUMENT_BUCKET, compose_upload_temp_s3_prefix(user_id, bot_id)
)
filenames = (
modify_input.knowledge.added_filenames
+ modify_input.knowledge.unchanged_filenames
)
chunk_size = (
modify_input.embedding_params.chunk_size
if modify_input.embedding_params
else DEFAULT_EMBEDDING_CONFIG["chunk_size"]
)
chunk_overlap = (
modify_input.embedding_params.chunk_overlap
if modify_input.embedding_params
else DEFAULT_EMBEDDING_CONFIG["chunk_overlap"]
)
enable_partition_pdf = (
modify_input.embedding_params.enable_partition_pdf
if modify_input.embedding_params
else DEFAULT_EMBEDDING_CONFIG["enable_partition_pdf"]
)
generation_params = (
modify_input.generation_params.model_dump()
if modify_input.generation_params
else DEFAULT_GENERATION_CONFIG
)
search_params = (
modify_input.search_params.model_dump()
if modify_input.search_params
else DEFAULT_SEARCH_CONFIG
)
agent = (
AgentModel(
tools=[
AgentToolModel(name=t.name, description=t.description)
for t in [
get_tool_by_name(tool_name)
for tool_name in modify_input.agent.tools
]
]
)
if modify_input.agent
else AgentModel(tools=[])
)
# if knowledge and embedding_params are not updated, skip embeding process.
# 'sync_status = "QUEUED"' will execute embeding process and update dynamodb record.
# 'sync_status= "SUCCEEDED"' will update only dynamodb record.
bot = find_private_bot_by_id(user_id, bot_id)
sync_status = "QUEUED" if modify_input.is_embedding_required(bot) else "SUCCEEDED"
update_bot(
user_id,
bot_id,
title=modify_input.title,
instruction=modify_input.instruction,
description=modify_input.description if modify_input.description else "",
embedding_params=EmbeddingParamsModel(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
enable_partition_pdf=enable_partition_pdf,
),
generation_params=GenerationParamsModel(**generation_params),
search_params=SearchParamsModel(**search_params),
agent=agent,
knowledge=KnowledgeModel(
source_urls=source_urls,
sitemap_urls=sitemap_urls,
filenames=filenames,
),
sync_status=sync_status,
sync_status_reason="",
display_retrieved_chunks=modify_input.display_retrieved_chunks,
)
return BotModifyOutput(
id=bot_id,
title=modify_input.title,
instruction=modify_input.instruction,
description=modify_input.description if modify_input.description else "",
embedding_params=EmbeddingParams(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
enable_partition_pdf=enable_partition_pdf,
),
generation_params=GenerationParams(**generation_params),
search_params=SearchParams(**search_params),
agent=Agent(
tools=[
AgentTool(name=tool.name, description=tool.description)
for tool in agent.tools
]
),
knowledge=Knowledge(
source_urls=source_urls,
sitemap_urls=sitemap_urls,
filenames=filenames,
),
)
def fetch_bot(user_id: str, bot_id: str) -> tuple[bool, BotModel]:
"""Fetch bot by id.
The first element of the returned tuple is whether the bot is owned or not.
`True` means the bot is owned by the user.
`False` means the bot is shared by another user.
"""
try:
return True, find_private_bot_by_id(user_id, bot_id)
except RecordNotFoundError:
pass #
try:
return False, find_public_bot_by_id(bot_id)
except RecordNotFoundError:
raise RecordNotFoundError(
f"Bot with ID {bot_id} not found in both private (for user {user_id}) and public items."
)
def fetch_all_bots_by_user_id(
user_id: str, limit: int | None = None, only_pinned: bool = False
) -> list[BotMeta]:
"""Find all private & shared bots of a user.
The order is descending by `last_used_time`.
"""
if not only_pinned and not limit:
raise ValueError("Must specify either `limit` or `only_pinned`")
if limit and only_pinned:
raise ValueError("Cannot specify both `limit` and `only_pinned`")
if limit and (limit < 0 or limit > 100):
raise ValueError("Limit must be between 0 and 100")
table = _get_table_client(user_id)
logger.info(f"Finding pinned bots for user: {user_id}")
# Fetch all pinned bots
query_params = {
"IndexName": "LastBotUsedIndex",
"KeyConditionExpression": Key("PK").eq(user_id),
"ScanIndexForward": False,
}
if limit:
query_params["Limit"] = limit
if only_pinned:
query_params["FilterExpression"] = Attr("IsPinned").eq(True)
response = table.query(**query_params)
bots = []
for item in response["Items"]:
if "OriginalBotId" in item:
# Fetch original bots of alias bots
is_original_available = True
try:
bot = find_public_bot_by_id(item["OriginalBotId"])
logger.info(f"Found original bot: {bot.id}")
meta = BotMeta(
id=bot.id,
title=bot.title,
create_time=float(bot.create_time),
last_used_time=float(bot.last_used_time),
is_pinned=item["IsPinned"],
owned=False,
available=True,
description=bot.description,
is_public=True,
sync_status=bot.sync_status,
)
except RecordNotFoundError:
# Original bot is removed
is_original_available = False
logger.info(f"Original bot {item['OriginalBotId']} has been removed")
meta = BotMeta(
id=item["OriginalBotId"],
title=item["Title"],
create_time=float(item["CreateTime"]),
last_used_time=float(item["LastBotUsed"]),
is_pinned=item["IsPinned"],
owned=False,
# NOTE: Original bot is removed
available=False,
description="This item is no longer available",
is_public=False,
sync_status="ORIGINAL_NOT_FOUND",
)
if is_original_available and (
bot.title != item["Title"]
or bot.description != item["Description"]
or bot.sync_status != item["SyncStatus"]
or bot.has_knowledge() != item["HasKnowledge"]
):
# Update alias to the latest original bot
store_alias(
user_id,
BotAliasModel(
id=decompose_bot_alias_id(item["SK"]),
# Update title and description
title=bot.title,
description=bot.description,
original_bot_id=item["OriginalBotId"],
create_time=float(item["CreateTime"]),
last_used_time=float(item["LastBotUsed"]),
is_pinned=item["IsPinned"],
sync_status=bot.sync_status,
has_knowledge=bot.has_knowledge(),
),
)
bots.append(meta)
else:
# Private bots
bots.append(
BotMeta(
id=decompose_bot_id(item["SK"]),
title=item["Title"],
create_time=float(item["CreateTime"]),
last_used_time=float(item["LastBotUsed"]),
is_pinned=item["IsPinned"],
owned=True,
available=True,
description=item["Description"],
is_public="PublicBotId" in item,
sync_status=item["SyncStatus"],
)
)
return bots
def fetch_bot_summary(user_id: str, bot_id: str) -> BotSummaryOutput:
try:
bot = find_private_bot_by_id(user_id, bot_id)
return BotSummaryOutput(
id=bot_id,
title=bot.title,
description=bot.description,
create_time=bot.create_time,
last_used_time=bot.last_used_time,
is_pinned=bot.is_pinned,
is_public=True if bot.public_bot_id else False,
has_agent=bot.is_agent_enabled(),
owned=True,
sync_status=bot.sync_status,
has_knowledge=bot.has_knowledge(),
)
except RecordNotFoundError:
pass
try:
alias = find_alias_by_id(user_id, bot_id)
return BotSummaryOutput(
id=alias.id,
title=alias.title,
description=alias.description,
create_time=alias.create_time,
last_used_time=alias.last_used_time,
is_pinned=alias.is_pinned,
is_public=True,
has_agent=bot.is_agent_enabled(),
owned=False,
sync_status=alias.sync_status,
has_knowledge=alias.has_knowledge,
)
except RecordNotFoundError:
pass
try:
# NOTE: At the first time using shared bot, alias is not created yet.
bot = find_public_bot_by_id(bot_id)
current_time = get_current_time()
# Store alias when opened shared bot page
store_alias(
user_id,
BotAliasModel(
id=bot.id,
title=bot.title,
description=bot.description,
original_bot_id=bot_id,
create_time=current_time,
last_used_time=current_time,
is_pinned=False,
sync_status=bot.sync_status,
has_knowledge=bot.has_knowledge(),
),
)
return BotSummaryOutput(
id=bot_id,
title=bot.title,
description=bot.description,
create_time=bot.create_time,
last_used_time=bot.last_used_time,
is_pinned=False, # NOTE: Shared bot is not pinned by default.
is_public=True,
has_agent=bot.is_agent_enabled(),
owned=False,
sync_status=bot.sync_status,
has_knowledge=bot.has_knowledge(),
)
except RecordNotFoundError:
raise RecordNotFoundError(
f"Bot with ID {bot_id} not found in both private (for user {user_id}) and alias, shared items."
)
def modify_pin_status(user_id: str, bot_id: str, pinned: bool):
"""Modify bot pin status."""
try:
return update_bot_pin_status(user_id, bot_id, pinned)
except RecordNotFoundError:
pass
try:
return update_alias_pin_status(user_id, bot_id, pinned)
except RecordNotFoundError:
raise RecordNotFoundError(f"Bot {bot_id} is neither owned nor alias.")
def remove_bot_by_id(user_id: str, bot_id: str):
"""Remove bot by id."""
try:
return delete_bot_by_id(user_id, bot_id)
except RecordNotFoundError:
pass
try:
return delete_alias_by_id(user_id, bot_id)
except RecordNotFoundError:
raise RecordNotFoundError(f"Bot {bot_id} is neither owned nor alias.")
def modify_bot_last_used_time(user_id: str, bot_id: str):
"""Modify bot last used time."""
try:
return update_bot_last_used_time(user_id, bot_id)
except RecordNotFoundError:
pass
try:
return update_alias_last_used_time(user_id, bot_id)
except RecordNotFoundError:
raise RecordNotFoundError(f"Bot {bot_id} is neither owned nor alias.")
def issue_presigned_url(
user_id: str, bot_id: str, filename: str, content_type: str
) -> str:
response = generate_presigned_url(
DOCUMENT_BUCKET,
compose_upload_temp_s3_path(user_id, bot_id, filename),
content_type=content_type,
expiration=3600,
client_method="put_object",
)
return response
def remove_uploaded_file(user_id: str, bot_id: str, filename: str):
delete_file_from_s3(
DOCUMENT_BUCKET, compose_upload_temp_s3_path(user_id, bot_id, filename)
)
return
def fetch_available_agent_tools():
"""Fetch available tools for bot."""
return get_available_tools()