awswrangler/athena/_statements.py (74 lines of code) (raw):

"""Amazon Athena Module gathering all functions related to prepared statements.""" from __future__ import annotations import logging from typing import TYPE_CHECKING, Any, Dict, List, Literal, cast import boto3 from botocore.exceptions import ClientError from awswrangler import _utils, exceptions from awswrangler._config import apply_configs if TYPE_CHECKING: from mypy_boto3_athena.client import AthenaClient _logger: logging.Logger = logging.getLogger(__name__) def _does_statement_exist( statement_name: str, workgroup: str, athena_client: "AthenaClient", ) -> bool: try: athena_client.get_prepared_statement(StatementName=statement_name, WorkGroup=workgroup) except ClientError as e: if e.response["Error"]["Code"] == "ResourceNotFoundException": return False raise e return True @apply_configs def create_prepared_statement( sql: str, statement_name: str, workgroup: str = "primary", mode: Literal["update", "error"] = "update", boto3_session: boto3.Session | None = None, ) -> None: """ Create a SQL statement with the name statement_name to be run at a later time. The statement can include parameters represented by question marks. https://docs.aws.amazon.com/athena/latest/ug/sql-prepare.html Parameters ---------- sql The query string for the prepared statement. statement_name The name of the prepared statement. workgroup The name of the workgroup to which the prepared statement belongs. Primary by default. mode Determines the behaviour if the prepared statement already exists: - ``update`` - updates statement if already exists - ``error`` - throws an error if table exists boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. Examples -------- >>> import awswrangler as wr >>> wr.athena.create_prepared_statement( ... sql="SELECT * FROM my_table WHERE name = ?", ... statement_name="statement", ... ) """ if mode not in ["update", "error"]: raise exceptions.InvalidArgumentValue("`mode` must be one of 'update' or 'error'.") athena_client = _utils.client("athena", session=boto3_session) already_exists = _does_statement_exist(statement_name, workgroup, athena_client) if already_exists and mode == "error": raise exceptions.AlreadyExists(f"Prepared statement {statement_name} already exists.") if already_exists: _logger.info(f"Updating prepared statement {statement_name}") athena_client.update_prepared_statement( StatementName=statement_name, WorkGroup=workgroup, QueryStatement=sql, ) else: _logger.info(f"Creating prepared statement {statement_name}") athena_client.create_prepared_statement( StatementName=statement_name, WorkGroup=workgroup, QueryStatement=sql, ) @apply_configs def list_prepared_statements( workgroup: str = "primary", boto3_session: boto3.Session | None = None ) -> list[dict[str, Any]]: """ List the prepared statements in the specified workgroup. Parameters ---------- workgroup The name of the workgroup to which the prepared statement belongs. Primary by default. boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. Returns ------- List of prepared statements in the workgroup. Each item is a dictionary with the keys ``StatementName`` and ``LastModifiedTime``. """ athena_client = _utils.client("athena", session=boto3_session) response = athena_client.list_prepared_statements(WorkGroup=workgroup) statements = response["PreparedStatements"] while "NextToken" in response: response = athena_client.list_prepared_statements(WorkGroup=workgroup, NextToken=response["NextToken"]) statements += response["PreparedStatements"] return cast(List[Dict[str, Any]], statements) @apply_configs def delete_prepared_statement( statement_name: str, workgroup: str = "primary", boto3_session: boto3.Session | None = None, ) -> None: """ Delete the prepared statement with the specified name from the specified workgroup. https://docs.aws.amazon.com/athena/latest/ug/sql-deallocate-prepare.html Parameters ---------- statement_name The name of the prepared statement. workgroup The name of the workgroup to which the prepared statement belongs. Primary by default. boto3_session The default boto3 session will be used if **boto3_session** receive ``None``. Examples -------- >>> import awswrangler as wr >>> wr.athena.delete_prepared_statement( ... statement_name="statement", ... ) """ athena_client = _utils.client("athena", session=boto3_session) workgroup = workgroup if workgroup else "primary" _logger.info(f"Deallocating prepared statement {statement_name}") athena_client.delete_prepared_statement( StatementName=statement_name, WorkGroup=workgroup, )