awswrangler/dynamodb/_read.py (418 lines of code) (raw):

"""Amazon DynamoDB Read Module (PRIVATE).""" from __future__ import annotations import itertools import logging import warnings from functools import wraps from typing import ( TYPE_CHECKING, Any, Callable, Dict, Iterator, List, NamedTuple, Sequence, TypeVar, cast, ) import boto3 import pyarrow as pa from boto3.dynamodb.conditions import ConditionBase, ConditionExpressionBuilder from boto3.dynamodb.types import Binary, TypeDeserializer, TypeSerializer from botocore.exceptions import ClientError from typing_extensions import Literal import awswrangler.pandas as pd from awswrangler import _data_types, _utils, exceptions from awswrangler._distributed import engine from awswrangler._executor import _BaseExecutor, _get_executor from awswrangler.distributed.ray import ray_get from awswrangler.dynamodb._utils import _deserialize_item, _serialize_item, execute_statement if TYPE_CHECKING: from mypy_boto3_dynamodb.client import DynamoDBClient from mypy_boto3_dynamodb.type_defs import TableAttributeValueTypeDef _logger: logging.Logger = logging.getLogger(__name__) _ItemsListType = List[Dict[str, "TableAttributeValueTypeDef"]] def _read_chunked(iterator: Iterator[dict[str, "TableAttributeValueTypeDef"]]) -> Iterator[pd.DataFrame]: for item in iterator: yield pd.DataFrame(item) def read_partiql_query( query: str, parameters: list[Any] | None = None, chunked: bool = False, boto3_session: boto3.Session | None = None, ) -> pd.DataFrame | Iterator[pd.DataFrame]: """Read data from a DynamoDB table via a PartiQL query. Parameters ---------- query The PartiQL statement. parameters The list of PartiQL parameters. These are applied to the statement in the order they are listed. chunked If `True` an iterable of DataFrames is returned. False by default. boto3_session The default boto3 session will be used if **boto3_session** is ``None``. Returns ------- Result as Pandas DataFrame. Examples -------- Select all contents from a table >>> import awswrangler as wr >>> wr.dynamodb.read_partiql_query( ... query="SELECT * FROM my_table WHERE title=? AND year=?", ... parameters=[title, year], ... ) Select specific columns from a table >>> wr.dynamodb.read_partiql_query( ... query="SELECT id FROM table" ... ) """ _logger.debug("Reading results for PartiQL query: '%s'", query) iterator: Iterator[dict[str, Any]] = execute_statement( # type: ignore[assignment] query, parameters=parameters, boto3_session=boto3_session ) if chunked: return _read_chunked(iterator=iterator) return pd.DataFrame([item for sublist in iterator for item in sublist]) def _get_invalid_kwarg(msg: str) -> str | None: """Detect which keyword argument contains reserved keywords based on given error message. Parameters ---------- msg Botocore client error message. Returns ------- Detected invalid keyword argument if any, None otherwise. """ for kwarg in ("ProjectionExpression", "KeyConditionExpression", "FilterExpression"): if msg.startswith(f"Invalid {kwarg}: Attribute name is a reserved keyword; reserved keyword: "): return kwarg return None # SEE: https://stackoverflow.com/a/72295070 # CustomCallable = TypeVar("CustomCallable", bound=Callable[[Any], Union[_ItemsListType, Iterator[_ItemsListType]]]) CustomCallable = TypeVar("CustomCallable", bound=Callable[..., Any]) def _handle_reserved_keyword_error(func: CustomCallable) -> CustomCallable: """Handle automatic replacement of DynamoDB reserved keywords. For reserved keywords reference: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html. """ @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: try: return func(*args, **kwargs) except ClientError as e: error_code, error_message = (e.response["Error"]["Code"], e.response["Error"]["Message"]) # Check caught error to verify its message kwarg = _get_invalid_kwarg(error_message) if (error_code == "ValidationException") and kwarg: reserved_keyword = error_message.split("keyword: ")[-1] sanitized_keyword = f"#{reserved_keyword}" kwargs[kwarg] = kwargs[kwarg].replace(reserved_keyword, sanitized_keyword) kwargs["ExpressionAttributeNames"] = { **kwargs.get("ExpressionAttributeNames", {}), sanitized_keyword: reserved_keyword, } # SEE: recursive approach guarantees that each reserved keyword will be properly replaced, # even if it will require as many calls as the reserved keywords involved (not so efficient...) return wrapper(*args, **kwargs) # Otherwise raise it raise e # SEE: https://github.com/python/mypy/issues/3157#issue-221120895 return cast(CustomCallable, wrapper) def _convert_items( items: _ItemsListType, as_dataframe: bool, arrow_kwargs: dict[str, Any], ) -> pd.DataFrame | _ItemsListType: return ( _utils.table_refs_to_df( [ _utils.list_to_arrow_table( # Convert DynamoDB "Binary" type to native Python data type mapping=[ {k: v.value if isinstance(v, Binary) else v for k, v in d.items()} # type: ignore[attr-defined] for d in items ], schema=arrow_kwargs.pop("schema", None), ) ], arrow_kwargs, ) if as_dataframe else items ) def _convert_items_chunked( items_iterator: Iterator[_ItemsListType], as_dataframe: bool, arrow_kwargs: dict[str, Any], ) -> Iterator[pd.DataFrame] | Iterator[_ItemsListType]: for items in items_iterator: yield _convert_items(items, as_dataframe, arrow_kwargs) def _read_scan_chunked( dynamodb_client: "DynamoDBClient" | None, as_dataframe: bool, kwargs: dict[str, Any], schema: pa.Schema | None = None, segment: int | None = None, ) -> Iterator[pa.Table] | Iterator[_ItemsListType]: # SEE: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan client_dynamodb = dynamodb_client if dynamodb_client else _utils.client(service_name="dynamodb") deserializer = TypeDeserializer() next_token: str | None = "init_token" # Dummy token total_items = 0 kwargs = dict(kwargs) if segment is not None: kwargs["Segment"] = segment while next_token: response = _handle_reserved_keyword_error(client_dynamodb.scan)(**kwargs) # Unlike a resource, the DynamoDB client returns serialized results, so they must be deserialized # Additionally, the DynamoDB "Binary" type is converted to a native Python data type # SEE: https://boto3.amazonaws.com/v1/documentation/api/latest/_modules/boto3/dynamodb/types.html items = [ {k: v["B"] if list(v.keys())[0] == "B" else deserializer.deserialize(v) for k, v in d.items()} for d in response.get("Items", []) ] total_items += len(items) yield _utils.list_to_arrow_table(mapping=items, schema=schema) if as_dataframe else items if ("Limit" in kwargs) and (total_items >= kwargs["Limit"]): break next_token = response.get("LastEvaluatedKey", None) if next_token: kwargs["ExclusiveStartKey"] = next_token @engine.dispatch_on_engine @_utils.retry( ex=ClientError, ex_code="ProvisionedThroughputExceededException", ) def _read_scan( dynamodb_client: "DynamoDBClient" | None, as_dataframe: bool, kwargs: dict[str, Any], schema: pa.Schema | None, segment: int, ) -> pa.Table | _ItemsListType: items_iterator: Iterator[_ItemsListType] = _read_scan_chunked(dynamodb_client, False, kwargs, None, segment) items = list(itertools.chain.from_iterable(items_iterator)) return _utils.list_to_arrow_table(mapping=items, schema=schema) if as_dataframe else items def _read_query_chunked(table_name: str, dynamodb_client: "DynamoDBClient", **kwargs: Any) -> Iterator[_ItemsListType]: next_token: str | None = "init_token" # Dummy token total_items = 0 # Handle pagination while next_token: response = dynamodb_client.query(TableName=table_name, **kwargs) items = response.get("Items", []) total_items += len(items) yield [_deserialize_item(item) for item in items] if ("Limit" in kwargs) and (total_items >= kwargs["Limit"]): break next_token = response.get("LastEvaluatedKey", None) if next_token: kwargs["ExclusiveStartKey"] = next_token @_handle_reserved_keyword_error def _read_query( table_name: str, dynamodb_client: "DynamoDBClient", chunked: bool, **kwargs: Any ) -> _ItemsListType | Iterator[_ItemsListType]: items_iterator = _read_query_chunked(table_name, dynamodb_client, **kwargs) if chunked: return items_iterator else: return list(itertools.chain.from_iterable(items_iterator)) def _read_batch_items_chunked( table_name: str, dynamodb_client: "DynamoDBClient" | None, **kwargs: Any ) -> Iterator[_ItemsListType]: dynamodb_client = dynamodb_client if dynamodb_client else _utils.client("dynamodb") deserializer = TypeDeserializer() response = dynamodb_client.batch_get_item(RequestItems={table_name: kwargs}) yield [_deserialize_item(d, deserializer) for d in response.get("Responses", {table_name: []}).get(table_name, [])] # SEE: handle possible unprocessed keys. As suggested in Boto3 docs, # this approach should involve exponential backoff, but this should be # already managed by AWS SDK itself, as stated # [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html) while response["UnprocessedKeys"]: kwargs["Keys"] = response["UnprocessedKeys"][table_name]["Keys"] response = dynamodb_client.batch_get_item(RequestItems={table_name: kwargs}) yield [ _deserialize_item(d, deserializer) for d in response.get("Responses", {table_name: []}).get(table_name, []) ] @_handle_reserved_keyword_error def _read_batch_items( table_name: str, dynamodb_client: "DynamoDBClient" | None, chunked: bool, **kwargs: Any ) -> _ItemsListType | Iterator[_ItemsListType]: items_iterator = _read_batch_items_chunked(table_name, dynamodb_client, **kwargs) if chunked: return items_iterator else: return list(itertools.chain.from_iterable(items_iterator)) @_handle_reserved_keyword_error def _read_item( table_name: str, dynamodb_client: "DynamoDBClient", chunked: bool = False, **kwargs: Any, ) -> _ItemsListType | Iterator[_ItemsListType]: item = dynamodb_client.get_item(TableName=table_name, **kwargs).get("Item", {}) item_list: _ItemsListType = [_deserialize_item(item)] return [item_list] if chunked else item_list def _read_items_scan( table_name: str, as_dataframe: bool, arrow_kwargs: dict[str, Any], use_threads: bool | int, dynamodb_client: "DynamoDBClient", chunked: bool, **kwargs: Any, ) -> pd.DataFrame | Iterator[pd.DataFrame] | _ItemsListType | Iterator[_ItemsListType]: kwargs["TableName"] = table_name schema = arrow_kwargs.pop("schema", None) if chunked: _logger.debug("Scanning DynamoDB table %s and returning results in an iterator", table_name) scan_iterator = _read_scan_chunked(dynamodb_client, as_dataframe, kwargs, schema) if as_dataframe: return (_utils.table_refs_to_df([items], arrow_kwargs) for items in scan_iterator) return scan_iterator # Use Parallel Scan executor: _BaseExecutor = _get_executor(use_threads=use_threads) total_segments = _utils.ensure_worker_or_thread_count(use_threads=use_threads) kwargs["TotalSegments"] = total_segments _logger.debug("Scanning DynamoDB table %s with %d segments", table_name, total_segments) items = executor.map( _read_scan, dynamodb_client, itertools.repeat(as_dataframe), itertools.repeat(kwargs), itertools.repeat(schema), range(total_segments), ) if as_dataframe: return _utils.table_refs_to_df(items, arrow_kwargs) return list(itertools.chain(*ray_get(items))) def _read_items( table_name: str, as_dataframe: bool, arrow_kwargs: dict[str, Any], use_threads: bool | int, chunked: bool, dynamodb_client: "DynamoDBClient", **kwargs: Any, ) -> pd.DataFrame | Iterator[pd.DataFrame] | _ItemsListType | Iterator[_ItemsListType]: # Extract 'Keys', 'IndexName' and 'Limit' from provided kwargs: if needed, will be reinserted later on keys = kwargs.pop("Keys", None) index = kwargs.pop("IndexName", None) limit = kwargs.pop("Limit", None) # Conditionally define optimal reading strategy use_get_item = (keys is not None) and (len(keys) == 1) use_batch_get_item = (keys is not None) and (len(keys) > 1) use_query = (keys is None) and ("KeyConditionExpression" in kwargs) # Single Item if use_get_item: kwargs["Key"] = keys[0] items = _read_item(table_name, dynamodb_client, chunked, **kwargs) # Batch of Items elif use_batch_get_item: kwargs["Keys"] = keys items = _read_batch_items(table_name, dynamodb_client, chunked, **kwargs) else: if limit: kwargs["Limit"] = limit _logger.debug("`max_items_evaluated` argument detected, setting use_threads to False") use_threads = False if index: kwargs["IndexName"] = index if use_query: # Query _logger.debug("Query DynamoDB table %s", table_name) items = _read_query(table_name, dynamodb_client, chunked, **kwargs) else: # Last resort use Scan warnings.warn( f"Attempting DynamoDB Scan operation with arguments:\n{kwargs}", UserWarning, ) return _read_items_scan( table_name=table_name, as_dataframe=as_dataframe, arrow_kwargs=arrow_kwargs, use_threads=use_threads, dynamodb_client=dynamodb_client, chunked=chunked, **kwargs, ) if chunked: return _convert_items_chunked( items_iterator=cast(Iterator[_ItemsListType], items), as_dataframe=as_dataframe, arrow_kwargs=arrow_kwargs ) else: return _convert_items(items=cast(_ItemsListType, items), as_dataframe=as_dataframe, arrow_kwargs=arrow_kwargs) class _ExpressionTuple(NamedTuple): condition_expression: str attribute_name_placeholders: dict[str, str] attribute_value_placeholders: dict[str, Any] def _convert_condition_base_to_expression( key_condition_expression: ConditionBase, is_key_condition: bool, serializer: TypeSerializer ) -> dict[str, Any]: builder = ConditionExpressionBuilder() # Use different namespaces for key and filter conditions if is_key_condition: builder._name_placeholder = "kn" builder._value_placeholder = "kv" expression = builder.build_expression(key_condition_expression, is_key_condition=is_key_condition) return _ExpressionTuple( condition_expression=expression.condition_expression, attribute_name_placeholders=expression.attribute_name_placeholders, attribute_value_placeholders=_serialize_item(expression.attribute_value_placeholders, serializer=serializer), ) @_utils.validate_distributed_kwargs( unsupported_kwargs=["boto3_session", "dtype_backend"], ) def read_items( # noqa: PLR0912 table_name: str, index_name: str | None = None, partition_values: Sequence[Any] | None = None, sort_values: Sequence[Any] | None = None, filter_expression: ConditionBase | str | None = None, key_condition_expression: ConditionBase | str | None = None, expression_attribute_names: dict[str, str] | None = None, expression_attribute_values: dict[str, Any] | None = None, consistent: bool = False, columns: Sequence[str] | None = None, allow_full_scan: bool = False, max_items_evaluated: int | None = None, dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", as_dataframe: bool = True, chunked: bool = False, use_threads: bool | int = True, boto3_session: boto3.Session | None = None, pyarrow_additional_kwargs: dict[str, Any] | None = None, ) -> pd.DataFrame | Iterator[pd.DataFrame] | _ItemsListType | Iterator[_ItemsListType]: """Read items from given DynamoDB table. This function aims to gracefully handle (some of) the complexity of read actions available in Boto3 towards a DynamoDB table, abstracting it away while providing a single, unified entry point. Under the hood, it wraps all the four available read actions: `get_item`, `batch_get_item`, `query` and `scan`. Warning ------- To avoid a potentially costly Scan operation, please make sure to pass arguments such as `partition_values` or `max_items_evaluated`. Note that `filter_expression` is applied AFTER a Scan Note ---- Number of Parallel Scan segments is based on the `use_threads` argument. A parallel scan with a large number of workers could consume all the provisioned throughput of the table or index. See: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan Note ---- If `max_items_evaluated` is specified, then `use_threads=False` is enforced. This is because it's not possible to limit the number of items in a Query/Scan operation across threads. Parameters ---------- table_name DynamoDB table name. index_name Name of the secondary global or local index on the table. Defaults to None. partition_values Partition key values to retrieve. Defaults to None. sort_values Sort key values to retrieve. Defaults to None. filter_expression Filter expression as string or combinations of boto3.dynamodb.conditions.Attr conditions. Defaults to None. key_condition_expression Key condition expression as string or combinations of boto3.dynamodb.conditions.Key conditions. Defaults to None. expression_attribute_names Mapping of placeholder and target attributes. Defaults to None. expression_attribute_values Mapping of placeholder and target values. Defaults to None. consistent If True, ensure that the performed read operation is strongly consistent, otherwise eventually consistent. Defaults to False. columns Attributes to retain in the returned items. Defaults to None (all attributes). allow_full_scan If True, allow full table scan without any filtering. Defaults to False. max_items_evaluated Limit the number of items evaluated in case of query or scan operations. Defaults to None (all matching items). When set, `use_threads` is enforced to False. dtype_backend Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, nullable dtypes are used for all dtypes that have a nullable implementation when “numpy_nullable” is set, pyarrow is used for all dtypes if “pyarrow” is set. The dtype_backends are still experimential. The "pyarrow" backend is only supported with Pandas 2.0 or above. as_dataframe If True, return items as pd.DataFrame, otherwise as list/dict. Defaults to True. chunked If `True` an iterable of DataFrames/lists is returned. False by default. use_threads Used for Parallel Scan requests. True (default) to enable concurrency, False to disable multiple threads. If enabled os.cpu_count() is used as the max number of threads. If integer is provided, specified number is used. boto3_session The default boto3 session will be used if **boto3_session** is ``None``. pyarrow_additional_kwargs Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame. Valid values include "split_blocks", "self_destruct", "ignore_metadata". e.g. pyarrow_additional_kwargs={'split_blocks': True}. Raises ------ exceptions.InvalidArgumentType When the specified table has also a sort key but only the partition values are specified. exceptions.InvalidArgumentCombination When both partition and sort values sequences are specified but they have different lengths, or when provided parameters are not enough informative to proceed with a read operation. Returns ------- ``pd.DataFrame | list[dict[str, Any]] | Iterable[pd.DataFrame] | Iterable[list[dict[str, Any]]]`` A Data frame containing the retrieved items, or a dictionary of returned items. Alternatively, the return type can be an iterable of either type when `chunked=True`. Examples -------- Reading 5 random items from a table >>> import awswrangler as wr >>> df = wr.dynamodb.read_items(table_name='my-table', max_items_evaluated=5) Strongly-consistent reading of a given partition value from a table >>> import awswrangler as wr >>> df = wr.dynamodb.read_items(table_name='my-table', partition_values=['my-value'], consistent=True) Reading items pairwise-identified by partition and sort values, from a table with a composite primary key >>> import awswrangler as wr >>> df = wr.dynamodb.read_items( ... table_name='my-table', ... partition_values=['pv_1', 'pv_2'], ... sort_values=['sv_1', 'sv_2'] ... ) Reading items while retaining only specified attributes, automatically handling possible collision with DynamoDB reserved keywords >>> import awswrangler as wr >>> df = wr.dynamodb.read_items( ... table_name='my-table', ... partition_values=['my-value'], ... columns=['connection', 'other_col'] # connection is a reserved keyword, managed under the hood! ... ) Reading all items from a table explicitly allowing full scan >>> import awswrangler as wr >>> df = wr.dynamodb.read_items(table_name='my-table', allow_full_scan=True) Reading items matching a KeyConditionExpression expressed with boto3.dynamodb.conditions.Key >>> import awswrangler as wr >>> from boto3.dynamodb.conditions import Key >>> df = wr.dynamodb.read_items( ... table_name='my-table', ... key_condition_expression=(Key('key_1').eq('val_1') & Key('key_2').eq('val_2')) ... ) Same as above, but with KeyConditionExpression as string >>> import awswrangler as wr >>> df = wr.dynamodb.read_items( ... table_name='my-table', ... key_condition_expression='key_1 = :v1 and key_2 = :v2', ... expression_attribute_values={':v1': 'val_1', ':v2': 'val_2'}, ... ) Reading items matching a FilterExpression expressed with boto3.dynamodb.conditions.Attr Note that FilterExpression is applied AFTER a Scan operation >>> import awswrangler as wr >>> from boto3.dynamodb.conditions import Attr >>> df = wr.dynamodb.read_items( ... table_name='my-table', ... filter_expression=Attr('my_attr').eq('this-value') ... ) Same as above, but with FilterExpression as string >>> import awswrangler as wr >>> df = wr.dynamodb.read_items( ... table_name='my-table', ... filter_expression='my_attr = :v', ... expression_attribute_values={':v': 'this-value'} ... ) Reading items involving an attribute which collides with DynamoDB reserved keywords >>> import awswrangler as wr >>> df = wr.dynamodb.read_items( ... table_name='my-table', ... filter_expression='#operator = :v', ... expression_attribute_names={'#operator': 'operator'}, ... expression_attribute_values={':v': 'this-value'} ... ) """ arrow_kwargs = _data_types.pyarrow2pandas_defaults( use_threads=use_threads, kwargs=pyarrow_additional_kwargs, dtype_backend=dtype_backend ) # Extract key schema dynamodb_client = _utils.client(service_name="dynamodb", session=boto3_session) serializer = TypeSerializer() table_key_schema = dynamodb_client.describe_table(TableName=table_name)["Table"]["KeySchema"] # Detect sort key, if any if len(table_key_schema) == 1: partition_key, sort_key = table_key_schema[0]["AttributeName"], None else: partition_key, sort_key = ( next(filter(lambda x: x["KeyType"] == "HASH", table_key_schema))["AttributeName"], next(filter(lambda x: x["KeyType"] == "RANGE", table_key_schema))["AttributeName"], ) # Build kwargs shared by read methods kwargs: dict[str, Any] = {"ConsistentRead": consistent} if partition_values: if sort_key is None: keys = [{partition_key: serializer.serialize(pv)} for pv in partition_values] else: if not sort_values: raise exceptions.InvalidArgumentType( f"Kwarg sort_values must be specified: table {table_name} has {sort_key} as sort key." ) if len(sort_values) != len(partition_values): raise exceptions.InvalidArgumentCombination("Partition and sort values must have the same length.") keys = [ {partition_key: serializer.serialize(pv), sort_key: serializer.serialize(sv)} for pv, sv in zip(partition_values, sort_values) ] kwargs["Keys"] = keys if index_name: kwargs["IndexName"] = index_name if key_condition_expression: if isinstance(key_condition_expression, str): kwargs["KeyConditionExpression"] = key_condition_expression else: expression_tuple = _convert_condition_base_to_expression( key_condition_expression, is_key_condition=True, serializer=serializer ) kwargs["KeyConditionExpression"] = expression_tuple.condition_expression kwargs["ExpressionAttributeNames"] = { **kwargs.get("ExpressionAttributeNames", {}), **expression_tuple.attribute_name_placeholders, } kwargs["ExpressionAttributeValues"] = { **kwargs.get("ExpressionAttributeValues", {}), **expression_tuple.attribute_value_placeholders, } if filter_expression: if isinstance(filter_expression, str): kwargs["FilterExpression"] = filter_expression else: expression_tuple = _convert_condition_base_to_expression( filter_expression, is_key_condition=False, serializer=serializer ) kwargs["FilterExpression"] = expression_tuple.condition_expression kwargs["ExpressionAttributeNames"] = { **kwargs.get("ExpressionAttributeNames", {}), **expression_tuple.attribute_name_placeholders, } kwargs["ExpressionAttributeValues"] = { **kwargs.get("ExpressionAttributeValues", {}), **expression_tuple.attribute_value_placeholders, } if columns: kwargs["ProjectionExpression"] = ", ".join(columns) if expression_attribute_names: kwargs["ExpressionAttributeNames"] = { **kwargs.get("ExpressionAttributeNames", {}), **expression_attribute_names, } if expression_attribute_values: kwargs["ExpressionAttributeValues"] = { **kwargs.get("ExpressionAttributeValues", {}), **_serialize_item(expression_attribute_values, serializer), } if max_items_evaluated: kwargs["Limit"] = max_items_evaluated _logger.debug("DynamoDB scan/query kwargs: %s", kwargs) # If kwargs are sufficiently informative, proceed with actual read op if any((partition_values, key_condition_expression, filter_expression, allow_full_scan, max_items_evaluated)): return _read_items( table_name=table_name, as_dataframe=as_dataframe, arrow_kwargs=arrow_kwargs, use_threads=use_threads, chunked=chunked, dynamodb_client=dynamodb_client, **kwargs, ) # Raise otherwise _args = ( "partition_values", "key_condition_expression", "filter_expression", "allow_full_scan", "max_items_evaluated", ) raise exceptions.InvalidArgumentCombination(f"Please provide at least one of these arguments: {', '.join(_args)}.")