packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/app/usecases/publication.py (217 lines of code) (raw):
import logging
import os
from app.repositories.api_publication import (
create_api_key,
delete_api_key,
delete_stack_by_bot_id,
find_api_key_by_id,
find_build_status_by_build_id,
find_stack_by_bot_id,
find_usage_plan_by_id,
)
from app.repositories.common import RecordNotFoundError, ResourceConflictError
from app.repositories.custom_bot import (
delete_bot_publication,
find_private_bot_by_id,
find_public_bot_by_id,
update_bot_publication,
)
from app.repositories.models.custom_bot import BotModel
from app.routes.schemas.api_publication import (
ApiKeyInput,
ApiKeyOutput,
BotPublishInput,
BotPublishOutput,
PublishedApiQuota,
PublishedApiThrottle,
)
from app.user import User
from app.utils import start_codebuild_project
logger = logging.getLogger(__name__)
REGION = os.environ.get("REGION", "us-east-1")
def _fetch_bot_with_permission_check(user: User, bot_id: str) -> BotModel:
if user.is_admin():
# If admin, fetch public (shared) bot
try:
bot = find_public_bot_by_id(bot_id)
except RecordNotFoundError:
raise RecordNotFoundError(f"Bot {bot_id} is not found or shared.")
return bot
# If not admin, fetch private bot
try:
bot = find_private_bot_by_id(user.id, bot_id)
except RecordNotFoundError:
raise RecordNotFoundError(
f"Bot {bot_id} is not published or not owned by user {user.id}."
)
return bot
def create_bot_publication(user: User, bot_id: str, bot_publish_input: BotPublishInput):
"""Publish an API for the bot."""
# Check existence and permission of the bot
try:
bot = find_private_bot_by_id(user.id, bot_id)
except RecordNotFoundError:
raise RecordNotFoundError(f"Bot {bot_id} is not found.")
if bot.public_bot_id is None:
raise ValueError(f"Bot {bot_id} is not shared. Cannot publish.")
if bot.published_api_codebuild_id is not None:
codebuild_status = find_build_status_by_build_id(bot.published_api_codebuild_id)
if codebuild_status not in ["SUCCEEDED", "FAILED"]:
raise ResourceConflictError(
f"Bot {bot_id} publication is already requested (build id: {bot.published_api_codebuild_id}). Please wait until the previous publication is completed."
)
else:
raise ValueError(
f"Bot {bot_id} is already published. Please remove the publication before re-publishing."
)
# Same value as `bot_id` is used for `public_bot_id`
published_api_id = bot_id
environment_variables = {}
environment_variables["PUBLISHED_API_ID"] = published_api_id
# Set environment variables.
# NOTE: default values are set in `cdk/lib/constructs/api-publish-codebuild.ts`
if bot_publish_input.throttle.rate_limit is not None:
environment_variables["PUBLISHED_API_THROTTLE_RATE_LIMIT"] = str(
bot_publish_input.throttle.rate_limit
)
if bot_publish_input.throttle.burst_limit is not None:
environment_variables["PUBLISHED_API_THROTTLE_BURST_LIMIT"] = str(
bot_publish_input.throttle.burst_limit
)
if bot_publish_input.quota.limit is not None:
environment_variables["PUBLISHED_API_QUOTA_LIMIT"] = str(
bot_publish_input.quota.limit
)
if bot_publish_input.quota.period is not None:
environment_variables["PUBLISHED_API_QUOTA_PERIOD"] = str(
bot_publish_input.quota.period
)
if bot_publish_input.stage is not None:
environment_variables["PUBLISHED_API_DEPLOYMENT_STAGE"] = str(
bot_publish_input.stage
)
if bot_publish_input.allowed_origins is not None:
environment_variables["PUBLISHED_API_ALLOWED_ORIGINS"] = (
str(bot_publish_input.allowed_origins).replace(" ", "").replace("'", '"')
)
# Create `ApiPublishmentStack` by CodeBuild
try:
build_id = start_codebuild_project(environment_variables=environment_variables)
except Exception as e:
raise e
# Update bot attribute
update_bot_publication(
user.id, bot_id, published_api_id=published_api_id, build_id=build_id
)
return
def fetch_bot_publication(user: User, bot_id: str) -> BotPublishOutput:
"""Get published bot by id."""
bot = _fetch_bot_with_permission_check(user, bot_id)
if bot.published_api_codebuild_id is None:
raise ValueError(f"Bot {bot_id} is not published.")
codebuild_status = find_build_status_by_build_id(bot.published_api_codebuild_id)
try:
stack = find_stack_by_bot_id(bot_id)
except RecordNotFoundError:
# Codebuild started but stack creation is not started
return BotPublishOutput(
stage="",
quota=PublishedApiQuota(limit=None, offset=None, period=None),
throttle=PublishedApiThrottle(rate_limit=None, burst_limit=None),
allowed_origins=[],
cfn_status="",
codebuild_id=bot.published_api_codebuild_id,
codebuild_status=codebuild_status,
endpoint="",
api_key_ids=[],
)
if codebuild_status != "SUCCEEDED":
# Return with cloudformation status
return BotPublishOutput(
stage="",
quota=PublishedApiQuota(limit=None, offset=None, period=None),
throttle=PublishedApiThrottle(rate_limit=None, burst_limit=None),
allowed_origins=[],
cfn_status=stack.stack_status,
codebuild_id=bot.published_api_codebuild_id,
codebuild_status=codebuild_status,
endpoint="",
api_key_ids=[],
)
logger.info(f"Bot {bot_id} is published. Fetching API Gateway information.")
usage_plan = find_usage_plan_by_id(stack.api_usage_plan_id) # type: ignore
return BotPublishOutput(
stage=stack.api_stage, # type: ignore
quota=PublishedApiQuota(
limit=usage_plan.quota.limit,
offset=usage_plan.quota.offset,
period=usage_plan.quota.period,
),
throttle=PublishedApiThrottle(
rate_limit=usage_plan.throttle.rate_limit,
burst_limit=usage_plan.throttle.burst_limit,
),
allowed_origins=stack.api_allowed_origins, # type: ignore
cfn_status=stack.stack_status,
codebuild_id=bot.published_api_codebuild_id,
codebuild_status=codebuild_status,
endpoint=f"https://{stack.api_id}.execute-api.{REGION}.amazonaws.com/{stack.api_stage}",
api_key_ids=usage_plan.key_ids,
)
def remove_bot_publication(user: User, bot_id: str):
"""Remove published bot by id."""
bot = _fetch_bot_with_permission_check(user, bot_id)
if bot.published_api_codebuild_id is None:
raise ValueError(f"Bot {bot_id} is not published.")
# If Codebuild is not succeeded, just delete bot attribute from DDB and return
if bot.published_api_codebuild_id is not None:
codebuild_status = find_build_status_by_build_id(bot.published_api_codebuild_id)
if codebuild_status not in ["SUCCEEDED", "FAILED"]:
raise ValueError(
f"Bot {bot_id} publication is requested (build id: {bot.published_api_codebuild_id}) but not completed. Wait until the publication is completed."
)
# Before delete cfn stack, delete all api keys
try:
stack = find_stack_by_bot_id(bot_id)
except RecordNotFoundError:
delete_bot_publication(bot.owner_user_id, bot_id)
return
if stack.stack_status == "CREATE_COMPLETED":
usage_plan = find_usage_plan_by_id(stack.api_usage_plan_id) # type: ignore
for key_id in usage_plan.key_ids:
delete_api_key(key_id)
# Delete `ApiPublishmentStack` by CloudFormation
delete_stack_by_bot_id(bot_id)
# Delete bot attribute
delete_bot_publication(bot.owner_user_id, bot_id)
return
def fetch_api_key(user: User, bot_id: str, api_key: str) -> ApiKeyOutput:
bot = _fetch_bot_with_permission_check(user, bot_id)
stack = find_stack_by_bot_id(bot_id)
assert (
stack.stack_status == "CREATE_COMPLETE"
), f"Bot {bot_id} stack creation is not completed."
usage_plan = find_usage_plan_by_id(stack.api_usage_plan_id) # type: ignore
# Check if the API key is associated with the bot and user
if api_key not in usage_plan.key_ids:
raise PermissionError(f"API Key {api_key} is not associated with bot {bot_id}.")
# Fetch API Key
key = find_api_key_by_id(api_key, include_value=True)
return ApiKeyOutput(
id=key.id,
value=key.value,
description=key.description,
enabled=key.enabled,
created_date=key.created_date,
)
def create_new_api_key(
user: User, bot_id: str, api_key_input: ApiKeyInput
) -> ApiKeyOutput:
bot = _fetch_bot_with_permission_check(user, bot_id)
stack = find_stack_by_bot_id(bot_id)
assert (
stack.stack_status == "CREATE_COMPLETE"
), f"Bot {bot_id} stack creation is not completed."
usage_plan = find_usage_plan_by_id(stack.api_usage_plan_id) # type: ignore
# Create API Key
key = create_api_key(usage_plan.id, api_key_input.description)
return ApiKeyOutput(
id=key.id,
value="",
description=key.description,
enabled=key.enabled,
created_date=key.created_date,
)
def remove_api_key(user: User, bot_id: str, api_key_id: str):
bot = _fetch_bot_with_permission_check(user, bot_id)
stack = find_stack_by_bot_id(bot_id)
assert (
stack.stack_status == "CREATE_COMPLETE"
), f"Bot {bot_id} stack creation is not completed."
usage_plan = find_usage_plan_by_id(stack.api_usage_plan_id) # type: ignore
# Check if the API key is associated with the bot and user
if api_key_id not in usage_plan.key_ids:
raise PermissionError(
f"API Key {api_key_id} is not associated with bot {bot_id}."
)
# Delete API Key
delete_api_key(api_key_id)
return