in packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/app/usecases/publication.py [0:0]
def fetch_bot_publication(user: User, bot_id: str) -> BotPublishOutput:
"""Get published bot by id."""
bot = _fetch_bot_with_permission_check(user, bot_id)
if bot.published_api_codebuild_id is None:
raise ValueError(f"Bot {bot_id} is not published.")
codebuild_status = find_build_status_by_build_id(bot.published_api_codebuild_id)
try:
stack = find_stack_by_bot_id(bot_id)
except RecordNotFoundError:
# Codebuild started but stack creation is not started
return BotPublishOutput(
stage="",
quota=PublishedApiQuota(limit=None, offset=None, period=None),
throttle=PublishedApiThrottle(rate_limit=None, burst_limit=None),
allowed_origins=[],
cfn_status="",
codebuild_id=bot.published_api_codebuild_id,
codebuild_status=codebuild_status,
endpoint="",
api_key_ids=[],
)
if codebuild_status != "SUCCEEDED":
# Return with cloudformation status
return BotPublishOutput(
stage="",
quota=PublishedApiQuota(limit=None, offset=None, period=None),
throttle=PublishedApiThrottle(rate_limit=None, burst_limit=None),
allowed_origins=[],
cfn_status=stack.stack_status,
codebuild_id=bot.published_api_codebuild_id,
codebuild_status=codebuild_status,
endpoint="",
api_key_ids=[],
)
logger.info(f"Bot {bot_id} is published. Fetching API Gateway information.")
usage_plan = find_usage_plan_by_id(stack.api_usage_plan_id) # type: ignore
return BotPublishOutput(
stage=stack.api_stage, # type: ignore
quota=PublishedApiQuota(
limit=usage_plan.quota.limit,
offset=usage_plan.quota.offset,
period=usage_plan.quota.period,
),
throttle=PublishedApiThrottle(
rate_limit=usage_plan.throttle.rate_limit,
burst_limit=usage_plan.throttle.burst_limit,
),
allowed_origins=stack.api_allowed_origins, # type: ignore
cfn_status=stack.stack_status,
codebuild_id=bot.published_api_codebuild_id,
codebuild_status=codebuild_status,
endpoint=f"https://{stack.api_id}.execute-api.{REGION}.amazonaws.com/{stack.api_stage}",
api_key_ids=usage_plan.key_ids,
)