awswrangler/cleanrooms/_utils.py (26 lines of code) (raw):

"""Utilities Module for Amazon Clean Rooms.""" from __future__ import annotations import logging import time from typing import TYPE_CHECKING import boto3 from awswrangler import _utils, exceptions if TYPE_CHECKING: from mypy_boto3_cleanrooms.type_defs import GetProtectedQueryOutputTypeDef _QUERY_FINAL_STATES: list[str] = ["CANCELLED", "FAILED", "SUCCESS", "TIMED_OUT"] _QUERY_WAIT_POLLING_DELAY: float = 2 # SECONDS _logger: logging.Logger = logging.getLogger(__name__) def wait_query( membership_id: str, query_id: str, boto3_session: boto3.Session | None = None ) -> "GetProtectedQueryOutputTypeDef": """Wait for the Clean Rooms protected query to end. Parameters ---------- membership_id Membership ID query_id Protected query execution ID boto3_session The default boto3 session will be used if **boto3_session** is ``None``. Returns ------- ``Dict[str, Any]`` Dictionary with the get_protected_query response. Raises ------ exceptions.QueryFailed Raises exception with error message if protected query is cancelled, times out or fails. Examples -------- >>> import awswrangler as wr >>> res = wr.cleanrooms.wait_query(membership_id='membership-id', query_id='query-id') """ client_cleanrooms = _utils.client(service_name="cleanrooms", session=boto3_session) state = "SUBMITTED" while state not in _QUERY_FINAL_STATES: time.sleep(_QUERY_WAIT_POLLING_DELAY) response = client_cleanrooms.get_protected_query( membershipIdentifier=membership_id, protectedQueryIdentifier=query_id ) state = response["protectedQuery"].get("status") # type: ignore[assignment] _logger.debug("state: %s", state) if state != "SUCCESS": raise exceptions.QueryFailed(response["protectedQuery"].get("Error")) return response