awswrangler/catalog/_get.py (411 lines of code) (raw):
"""AWS Glue Catalog Get Module."""
from __future__ import annotations
import base64
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, Iterator, cast
import boto3
import botocore.exceptions
import awswrangler.pandas as pd
from awswrangler import _utils, exceptions
from awswrangler._config import apply_configs
from awswrangler.catalog._utils import _catalog_id, _extract_dtypes_from_table_details
if TYPE_CHECKING:
from mypy_boto3_glue.type_defs import GetPartitionsResponseTypeDef
_logger: logging.Logger = logging.getLogger(__name__)
def _get_table_input(
database: str,
table: str,
boto3_session: boto3.Session | None,
catalog_id: str | None = None,
) -> dict[str, Any] | None:
client_glue = _utils.client("glue", session=boto3_session)
args: dict[str, Any] = _catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table)
try:
response = client_glue.get_table(**args)
except client_glue.exceptions.EntityNotFoundException:
return None
table_input: dict[str, Any] = {}
for k, v in response["Table"].items():
if k in [
"Name",
"Description",
"Owner",
"LastAccessTime",
"LastAnalyzedTime",
"Retention",
"StorageDescriptor",
"PartitionKeys",
"ViewOriginalText",
"ViewExpandedText",
"TableType",
"Parameters",
"TargetTable",
]:
table_input[k] = v
return table_input
def _append_partitions(partitions_values: dict[str, list[str]], response: "GetPartitionsResponseTypeDef") -> str | None:
_logger.debug("response: %s", response)
token: str | None = response.get("NextToken", None)
if (response is not None) and ("Partitions" in response):
for partition in response["Partitions"]:
location: str | None = partition["StorageDescriptor"].get("Location")
if location is not None:
values: list[str] = partition["Values"]
partitions_values[location] = values
else:
token = None
return token
def _get_partitions(
database: str,
table: str,
expression: str | None = None,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, list[str]]:
client_glue = _utils.client("glue", session=boto3_session)
args: dict[str, Any] = _catalog_id(
catalog_id=catalog_id,
DatabaseName=database,
TableName=table,
MaxResults=1_000,
Segment={"SegmentNumber": 0, "TotalSegments": 1},
ExcludeColumnSchema=True,
)
if expression is not None:
args["Expression"] = expression
partitions_values: dict[str, list[str]] = {}
_logger.debug("Starting pagination...")
response = client_glue.get_partitions(**args)
token: str | None = _append_partitions(partitions_values=partitions_values, response=response)
while token is not None:
args["NextToken"] = response["NextToken"]
response = client_glue.get_partitions(**args)
token = _append_partitions(partitions_values=partitions_values, response=response)
_logger.debug("Pagination done.")
return partitions_values
@apply_configs
def get_table_types(
database: str,
table: str,
catalog_id: str | None = None,
filter_iceberg_current: bool = False,
boto3_session: boto3.Session | None = None,
) -> dict[str, str] | None:
"""Get all columns and types from a table.
Parameters
----------
database
Database name.
table
Table name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
filter_iceberg_current
If True, returns only current iceberg fields (fields marked with iceberg.field.current: true).
Otherwise, returns the all fields. False by default (return all fields).
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
If table exists, a dictionary like {'col name': 'col data type'}. Otherwise None.
Examples
--------
>>> import awswrangler as wr
>>> wr.catalog.get_table_types(database='default', table='my_table')
{'col0': 'int', 'col1': double}
"""
client_glue = _utils.client(service_name="glue", session=boto3_session)
try:
response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
except client_glue.exceptions.EntityNotFoundException:
return None
return _extract_dtypes_from_table_details(
response=response,
filter_iceberg_current=filter_iceberg_current,
)
def get_databases(
catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> Iterator[dict[str, Any]]:
"""Get an iterator of databases.
Parameters
----------
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Iterator of Databases.
Examples
--------
>>> import awswrangler as wr
>>> dbs = wr.catalog.get_databases()
"""
client_glue = _utils.client("glue", session=boto3_session)
paginator = client_glue.get_paginator("get_databases")
response_iterator = paginator.paginate(**_catalog_id(catalog_id=catalog_id))
for page in response_iterator:
for db in page["DatabaseList"]:
yield cast(Dict[str, Any], db)
@apply_configs
def databases(
limit: int = 100, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> pd.DataFrame:
"""Get a Pandas DataFrame with all listed databases.
Parameters
----------
limit
Max number of tables to be returned.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Pandas DataFrame filled by formatted table information.
Examples
--------
>>> import awswrangler as wr
>>> df_dbs = wr.catalog.databases()
"""
database_iter = get_databases(catalog_id=catalog_id, boto3_session=boto3_session)
dbs = itertools.islice(database_iter, limit)
df_dict: dict[str, list[str]] = {"Database": [], "Description": []}
for db in dbs:
df_dict["Database"].append(db["Name"])
df_dict["Description"].append(db.get("Description", ""))
return pd.DataFrame(data=df_dict)
@apply_configs
def get_tables(
catalog_id: str | None = None,
database: str | None = None,
name_contains: str | None = None,
name_prefix: str | None = None,
name_suffix: str | None = None,
boto3_session: boto3.Session | None = None,
) -> Iterator[dict[str, Any]]:
"""Get an iterator of tables.
Note
----
Please, do not filter using name_contains and name_prefix/name_suffix at the same time.
Only name_prefix and name_suffix can be combined together.
Parameters
----------
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
database
Database name.
name_contains
Select by a specific string on table name
name_prefix
Select by a specific prefix on table name
name_suffix
Select by a specific suffix on table name
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Iterator of tables.
Examples
--------
>>> import awswrangler as wr
>>> tables = wr.catalog.get_tables()
"""
client_glue = _utils.client(service_name="glue", session=boto3_session)
paginator = client_glue.get_paginator("get_tables")
args: dict[str, str] = {}
if (name_prefix is not None) and (name_suffix is not None) and (name_contains is not None):
raise exceptions.InvalidArgumentCombination(
"Please, do not filter using name_contains and "
"name_prefix/name_suffix at the same time. Only "
"name_prefix and name_suffix can be combined together."
)
if (name_prefix is not None) and (name_suffix is not None):
args["Expression"] = f"{name_prefix}*{name_suffix}"
elif name_contains is not None:
args["Expression"] = f"*{name_contains}*"
elif name_prefix is not None:
args["Expression"] = f"{name_prefix}*"
elif name_suffix is not None:
args["Expression"] = f"*{name_suffix}"
if database is not None:
dbs: list[str] = [database]
else:
dbs = [x["Name"] for x in get_databases(catalog_id=catalog_id)]
for db in dbs:
args["DatabaseName"] = db
response_iterator = paginator.paginate(**_catalog_id(catalog_id=catalog_id, **args))
try:
for page in response_iterator:
for tbl in page["TableList"]:
yield cast(Dict[str, Any], tbl)
except client_glue.exceptions.EntityNotFoundException:
continue
@apply_configs
def tables(
limit: int = 100,
catalog_id: str | None = None,
database: str | None = None,
search_text: str | None = None,
name_contains: str | None = None,
name_prefix: str | None = None,
name_suffix: str | None = None,
boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
"""Get a DataFrame with tables filtered by a search term, prefix, suffix.
Parameters
----------
limit
Max number of tables to be returned.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If none is provided, the AWS account ID is used by default.
database
Database name.
search_text
Select only tables with the given string in table's properties.
name_contains
Select by a specific string on table name
name_prefix
Select by a specific prefix on table name
name_suffix
Select by a specific suffix on table name
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Pandas DataFrame filled by formatted table information.
Examples
--------
>>> import awswrangler as wr
>>> df_tables = wr.catalog.tables()
"""
if search_text is None:
table_iter = get_tables(
catalog_id=catalog_id,
database=database,
name_contains=name_contains,
name_prefix=name_prefix,
name_suffix=name_suffix,
boto3_session=boto3_session,
)
tbls: list[dict[str, Any]] = list(itertools.islice(table_iter, limit))
else:
tbls = list(search_tables(text=search_text, catalog_id=catalog_id, boto3_session=boto3_session))
if database is not None:
tbls = [x for x in tbls if x["DatabaseName"] == database]
if name_contains is not None:
tbls = [x for x in tbls if name_contains in x["Name"]]
if name_prefix is not None:
tbls = [x for x in tbls if x["Name"].startswith(name_prefix)]
if name_suffix is not None:
tbls = [x for x in tbls if x["Name"].endswith(name_suffix)]
tbls = tbls[:limit]
df_dict: dict[str, list[str]] = {
"Database": [],
"Table": [],
"Description": [],
"TableType": [],
"Columns": [],
"Partitions": [],
}
for tbl in tbls:
df_dict["Database"].append(tbl["DatabaseName"])
df_dict["Table"].append(tbl["Name"])
df_dict["Description"].append(tbl.get("Description", ""))
df_dict["TableType"].append(tbl.get("TableType", ""))
try:
columns = tbl["StorageDescriptor"]["Columns"]
df_dict["Columns"].append(", ".join([x["Name"] for x in columns]))
except KeyError:
df_dict["Columns"].append("")
if "PartitionKeys" in tbl:
df_dict["Partitions"].append(", ".join([x["Name"] for x in tbl["PartitionKeys"]]))
else:
df_dict["Partitions"].append("")
return pd.DataFrame(data=df_dict)
def search_tables(
text: str, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> Iterator[dict[str, Any]]:
"""Get Pandas DataFrame of tables filtered by a search string.
Parameters
----------
text
Select only tables with the given string in table's properties.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Iterator of tables.
Examples
--------
>>> import awswrangler as wr
>>> df_tables = wr.catalog.search_tables(text='my_property')
"""
client_glue = _utils.client("glue", session=boto3_session)
args: dict[str, Any] = _catalog_id(catalog_id=catalog_id, SearchText=text)
response = client_glue.search_tables(**args)
for tbl in response["TableList"]:
yield cast(Dict[str, Any], tbl)
while "NextToken" in response:
args["NextToken"] = response["NextToken"]
response = client_glue.search_tables(**args)
for tbl in response["TableList"]:
yield cast(Dict[str, Any], tbl)
@apply_configs
def table(
database: str,
table: str,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
"""Get table details as Pandas DataFrame.
Parameters
----------
database
Database name.
table
Table name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Pandas DataFrame filled by formatted table information.
Examples
--------
>>> import awswrangler as wr
>>> df_table = wr.catalog.table(database='default', table='my_table')
"""
client_glue = _utils.client(service_name="glue", session=boto3_session)
tbl = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))["Table"]
df_dict: dict[str, list[str | bool]] = {"Column Name": [], "Type": [], "Partition": [], "Comment": []}
if "StorageDescriptor" in tbl:
for col in tbl["StorageDescriptor"].get("Columns", {}):
df_dict["Column Name"].append(col["Name"])
df_dict["Type"].append(col["Type"])
df_dict["Partition"].append(False)
if "Comment" in col:
df_dict["Comment"].append(col["Comment"])
else:
df_dict["Comment"].append("")
if "PartitionKeys" in tbl:
for col in tbl["PartitionKeys"]:
df_dict["Column Name"].append(col["Name"])
df_dict["Type"].append(col["Type"])
df_dict["Partition"].append(True)
if "Comment" in col:
df_dict["Comment"].append(col["Comment"])
else:
df_dict["Comment"].append("")
return pd.DataFrame(data=df_dict)
@apply_configs
def get_table_location(
database: str,
table: str,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> str:
"""Get table's location on Glue catalog.
Parameters
----------
database
Database name.
table
Table name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If none is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Table's location.
Examples
--------
>>> import awswrangler as wr
>>> wr.catalog.get_table_location(database='default', table='my_table')
's3://bucket/prefix/'
"""
client_glue = _utils.client("glue", session=boto3_session)
res = client_glue.get_table(
**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table),
)
try:
return res["Table"]["StorageDescriptor"]["Location"]
except KeyError as ex:
raise exceptions.InvalidTable(f"{database}.{table}") from ex
def get_connection(
name: str, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> dict[str, Any]:
"""Get Glue connection details.
Parameters
----------
name
Connection name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
API Response for:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_connection
Examples
--------
>>> import awswrangler as wr
>>> res = wr.catalog.get_connection(name='my_connection')
"""
client_glue = _utils.client("glue", session=boto3_session)
res = _utils.try_it(
f=client_glue.get_connection,
ex=botocore.exceptions.ClientError,
ex_code="ThrottlingException",
max_num_tries=3,
**_catalog_id(catalog_id=catalog_id, Name=name, HidePassword=False),
)["Connection"]
if "ENCRYPTED_PASSWORD" in res["ConnectionProperties"]:
client_kms = _utils.client(service_name="kms", session=boto3_session)
pwd = client_kms.decrypt(CiphertextBlob=base64.b64decode(res["ConnectionProperties"]["ENCRYPTED_PASSWORD"]))[
"Plaintext"
].decode("utf-8")
res["ConnectionProperties"]["PASSWORD"] = pwd
return cast(Dict[str, Any], res)
@apply_configs
def get_parquet_partitions(
database: str,
table: str,
expression: str | None = None,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, list[str]]:
"""Get all partitions from a Table in the AWS Glue Catalog.
Expression argument instructions:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions
Parameters
----------
database
Database name.
table
Table name.
expression
An expression that filters the partitions to be returned.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If none is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
partitions_values: Dictionary with keys as S3 path locations and values as a
list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}).
Examples
--------
Fetch all partitions
>>> import awswrangler as wr
>>> wr.catalog.get_parquet_partitions(
... database='default',
... table='my_table',
... )
{
's3://bucket/prefix/y=2020/m=10/': ['2020', '10'],
's3://bucket/prefix/y=2020/m=11/': ['2020', '11'],
's3://bucket/prefix/y=2020/m=12/': ['2020', '12']
}
Filtering partitions
>>> import awswrangler as wr
>>> wr.catalog.get_parquet_partitions(
... database='default',
... table='my_table',
... expression='m=10'
... )
{
's3://bucket/prefix/y=2020/m=10/': ['2020', '10']
}
"""
return _get_partitions(
database=database,
table=table,
expression=expression,
catalog_id=catalog_id,
boto3_session=boto3_session,
)
@apply_configs
def get_csv_partitions(
database: str,
table: str,
expression: str | None = None,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, list[str]]:
"""Get all partitions from a Table in the AWS Glue Catalog.
Expression argument instructions:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions
Parameters
----------
database
Database name.
table
Table name.
expression
An expression that filters the partitions to be returned.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
partitions_values: Dictionary with keys as S3 path locations and values as a
list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}).
Examples
--------
Fetch all partitions
>>> import awswrangler as wr
>>> wr.catalog.get_csv_partitions(
... database='default',
... table='my_table',
... )
{
's3://bucket/prefix/y=2020/m=10/': ['2020', '10'],
's3://bucket/prefix/y=2020/m=11/': ['2020', '11'],
's3://bucket/prefix/y=2020/m=12/': ['2020', '12']
}
Filtering partitions
>>> import awswrangler as wr
>>> wr.catalog.get_csv_partitions(
... database='default',
... table='my_table',
... expression='m=10'
... )
{
's3://bucket/prefix/y=2020/m=10/': ['2020', '10']
}
"""
return _get_partitions(
database=database,
table=table,
expression=expression,
catalog_id=catalog_id,
boto3_session=boto3_session,
)
@apply_configs
def get_partitions(
database: str,
table: str,
expression: str | None = None,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, list[str]]:
"""Get all partitions from a Table in the AWS Glue Catalog.
Expression argument instructions:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions
Parameters
----------
database
Database name.
table
Table name.
expression
An expression that filters the partitions to be returned.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
partitions_values: Dictionary with keys as S3 path locations and values as a
list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}).
Examples
--------
Fetch all partitions
>>> import awswrangler as wr
>>> wr.catalog.get_partitions(
... database='default',
... table='my_table',
... )
{
's3://bucket/prefix/y=2020/m=10/': ['2020', '10'],
's3://bucket/prefix/y=2020/m=11/': ['2020', '11'],
's3://bucket/prefix/y=2020/m=12/': ['2020', '12']
}
Filtering partitions
>>> import awswrangler as wr
>>> wr.catalog.get_partitions(
... database='default',
... table='my_table',
... expression='m=10'
... )
{
's3://bucket/prefix/y=2020/m=10/': ['2020', '10']
}
"""
return _get_partitions(
database=database,
table=table,
expression=expression,
catalog_id=catalog_id,
boto3_session=boto3_session,
)
def get_table_parameters(
database: str,
table: str,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, str]:
"""Get all parameters.
Parameters
----------
database
Database name.
table
Table name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Dictionary of parameters.
Examples
--------
>>> import awswrangler as wr
>>> pars = wr.catalog.get_table_parameters(database="...", table="...")
"""
client_glue = _utils.client("glue", session=boto3_session)
response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
parameters: dict[str, str] = response["Table"]["Parameters"]
return parameters
def get_table_description(
database: str,
table: str,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> str | None:
"""Get table description.
Parameters
----------
database
Database name.
table
Table name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If none is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Description if exists.
Examples
--------
>>> import awswrangler as wr
>>> desc = wr.catalog.get_table_description(database="...", table="...")
"""
client_glue = _utils.client("glue", session=boto3_session)
response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
desc: str | None = response["Table"].get("Description", None)
return desc
@apply_configs
def get_columns_comments(
database: str,
table: str,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, str | None]:
"""Get all columns comments.
Parameters
----------
database
Database name.
table
Table name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Columns comments. e.g. {"col1": "foo boo bar", "col2": None}.
Examples
--------
>>> import awswrangler as wr
>>> pars = wr.catalog.get_columns_comments(database="...", table="...")
"""
client_glue = _utils.client("glue", session=boto3_session)
response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
comments: dict[str, str | None] = {}
for c in response["Table"]["StorageDescriptor"]["Columns"]:
comments[c["Name"]] = c.get("Comment")
if "PartitionKeys" in response["Table"]:
for p in response["Table"]["PartitionKeys"]:
comments[p["Name"]] = p.get("Comment")
return comments
@apply_configs
def get_columns_parameters(
database: str,
table: str,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, dict[str, str] | None]:
"""Get all columns parameters.
Parameters
----------
database
Database name.
table
Table name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Columns parameters.
Examples
--------
>>> import awswrangler as wr
>>> pars = wr.catalog.get_columns_parameters(database="...", table="...")
"""
client_glue = _utils.client("glue", session=boto3_session)
response = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table))
parameters = {}
for c in response["Table"]["StorageDescriptor"]["Columns"]:
parameters[c["Name"]] = c.get("Parameters")
if "PartitionKeys" in response["Table"]:
for p in response["Table"]["PartitionKeys"]:
parameters[p["Name"]] = p.get("Parameters")
return parameters
@apply_configs
def get_table_versions(
database: str, table: str, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> list[dict[str, Any]]:
"""Get all versions.
Parameters
----------
database
Database name.
table
Table name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
List of table inputs:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_table_versions
Examples
--------
>>> import awswrangler as wr
>>> tables_versions = wr.catalog.get_table_versions(database="...", table="...")
"""
client_glue = _utils.client("glue", session=boto3_session)
paginator = client_glue.get_paginator("get_table_versions")
versions: list[dict[str, Any]] = []
response_iterator = paginator.paginate(**_catalog_id(DatabaseName=database, TableName=table, catalog_id=catalog_id))
for page in response_iterator:
for tbl in page["TableVersions"]:
versions.append(cast(Dict[str, Any], tbl))
return versions
@apply_configs
def get_table_number_of_versions(
database: str, table: str, catalog_id: str | None = None, boto3_session: boto3.Session | None = None
) -> int:
"""Get total number of versions.
Parameters
----------
database
Database name.
table
Table name.
catalog_id
The ID of the Data Catalog from which to retrieve Databases.
If ``None`` is provided, the AWS account ID is used by default.
boto3_session
The default boto3 session will be used if **boto3_session** receive ``None``.
Returns
-------
Total number of versions.
Examples
--------
>>> import awswrangler as wr
>>> num = wr.catalog.get_table_number_of_versions(database="...", table="...")
"""
client_glue = _utils.client(service_name="glue", session=boto3_session)
paginator = client_glue.get_paginator("get_table_versions")
count: int = 0
response_iterator = paginator.paginate(**_catalog_id(DatabaseName=database, TableName=table, catalog_id=catalog_id))
for page in response_iterator:
count += len(page["TableVersions"])
return count