packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/app/utils.py (141 lines of code) (raw):

import json import logging import os from datetime import datetime from typing import Any, List, Literal import boto3 import pg8000 from anthropic import AnthropicBedrock from app.repositories.models.conversation import MessageModel from aws_lambda_powertools.utilities import parameters from botocore.client import Config from botocore.exceptions import ClientError logger = logging.getLogger(__name__) REGION = os.environ.get("REGION", "us-east-1") BEDROCK_REGION = os.environ.get("BEDROCK_REGION", "us-east-1") PUBLISH_API_CODEBUILD_PROJECT_NAME = os.environ.get( "PUBLISH_API_CODEBUILD_PROJECT_NAME", "" ) DB_SECRETS_ARN = os.environ.get("DB_SECRETS_ARN", "") def is_running_on_lambda(): return "AWS_EXECUTION_ENV" in os.environ def is_anthropic_model(model_id: str) -> bool: return model_id.startswith("anthropic") or False def get_bedrock_client(region=BEDROCK_REGION): client = boto3.client("bedrock-runtime", region) return client def get_anthropic_client(region=BEDROCK_REGION): client = AnthropicBedrock(aws_region=region) return client def get_current_time(): # Get current time as milliseconds epoch time return int(datetime.now().timestamp() * 1000) def generate_presigned_url( bucket: str, key: str, content_type: str | None = None, expiration=3600, client_method: Literal["put_object", "get_object"] = "put_object", ): # See: https://github.com/boto/boto3/issues/421#issuecomment-1849066655 client = boto3.client( "s3", region_name=REGION, config=Config(signature_version="v4", s3={"addressing_style": "path"}), ) params = {"Bucket": bucket, "Key": key} if content_type: params["ContentType"] = content_type response = client.generate_presigned_url( ClientMethod=client_method, Params=params, ExpiresIn=expiration, HttpMethod="PUT" if client_method == "put_object" else "GET", ) return response def compose_upload_temp_s3_prefix(user_id: str, bot_id: str) -> str: return f"{user_id}/{bot_id}/_temp/" def compose_upload_temp_s3_path(user_id: str, bot_id: str, filename: str) -> str: """Compose S3 path for temporary files. This path is used for uploading files to S3. """ prefix = compose_upload_temp_s3_prefix return f"{prefix(user_id, bot_id)}{filename}" def compose_upload_document_s3_path(user_id: str, bot_id: str, filename: str) -> str: """Compose S3 path for documents. The files on this path is used for embedding. """ return f"{user_id}/{bot_id}/documents/{filename}" def delete_file_from_s3(bucket: str, key: str): client = boto3.client("s3") # Check if the file exists try: client.head_object(Bucket=bucket, Key=key) except ClientError as e: if e.response["Error"]["Code"] == "404": raise FileNotFoundError(f"The file does not exist in bucket.") else: raise response = client.delete_object(Bucket=bucket, Key=key) return response def delete_files_with_prefix_from_s3(bucket: str, prefix: str): """Delete all objects with the given prefix from the given bucket.""" client = boto3.client("s3") response = client.list_objects_v2(Bucket=bucket, Prefix=prefix) if "Contents" not in response: return for obj in response["Contents"]: client.delete_object(Bucket=bucket, Key=obj["Key"]) def check_if_file_exists_in_s3(bucket: str, key: str): client = boto3.client("s3") # Check if the file exists try: client.head_object(Bucket=bucket, Key=key) except ClientError as e: if e.response["Error"]["Code"] == "404": return False else: raise return True def move_file_in_s3(bucket: str, key: str, new_key: str): client = boto3.client("s3") # Check if the file exists try: client.head_object(Bucket=bucket, Key=key) except ClientError as e: if e.response["Error"]["Code"] == "404": raise FileNotFoundError(f"The file does not exist in bucket.") else: raise response = client.copy_object( Bucket=bucket, Key=new_key, CopySource={"Bucket": bucket, "Key": key} ) response = client.delete_object(Bucket=bucket, Key=key) return response def start_codebuild_project(environment_variables: dict) -> str: environment_variables_override = [ {"name": key, "value": value} for key, value in environment_variables.items() ] client = boto3.client("codebuild") response = client.start_build( projectName=PUBLISH_API_CODEBUILD_PROJECT_NAME, environmentVariablesOverride=environment_variables_override, ) return response["build"]["id"] def query_postgres( query: str, params: tuple | None = None, include_columns: bool = False, ) -> tuple: """Query the PostgreSQL and return the results. Args: query (str): The SQL query to execute. params (tuple, optional): The parameters for the query template. Defaults to None. include_columns (bool, optional): Whether to include the column names in the result. Defaults to False. Returns: tuple: The results of the query. example: ((1, 'Alice'), (2, 'Bob')) if include_columns is False (('id', 'name'), (1, 'Alice'), (2, 'Bob')) if include_columns is True """ secrets: Any = parameters.get_secret(DB_SECRETS_ARN) # type: ignore db_info = json.loads(secrets) conn = pg8000.connect( database=db_info["dbname"], host=db_info["host"], port=db_info["port"], user=db_info["username"], password=db_info["password"], ) args = params if params else () try: with conn.cursor() as cursor: cursor.execute(query, args=args) res = cursor.fetchall() columns = tuple([desc[0] for desc in cursor.description]) except Exception as e: logger.error(f"Error executing query: {e}") raise e finally: conn.close() logger.debug(f"{len(res)} records found.") if include_columns: return columns, res return res