awswrangler/catalog/_add.py (178 lines of code) (raw):

"""AWS Glue Catalog Delete Module.""" from __future__ import annotations import logging from typing import Any import boto3 from awswrangler import _utils, exceptions, typing from awswrangler._config import apply_configs from awswrangler.catalog._definitions import ( _check_column_type, _csv_partition_definition, _json_partition_definition, _orc_partition_definition, _parquet_partition_definition, _update_table_definition, ) from awswrangler.catalog._utils import _catalog_id, sanitize_table_name _logger: logging.Logger = logging.getLogger(__name__) def _add_partitions( database: str, table: str, boto3_session: boto3.Session | None, inputs: list[dict[str, Any]], catalog_id: str | None = None, ) -> None: chunks: list[list[dict[str, Any]]] = _utils.chunkify(lst=inputs, max_length=100) client_glue = _utils.client(service_name="glue", session=boto3_session) for chunk in chunks: res = client_glue.batch_create_partition( **_catalog_id(catalog_id=catalog_id, DatabaseName=database, TableName=table, PartitionInputList=chunk) ) if ("Errors" in res) and res["Errors"]: for error in res["Errors"]: if "ErrorDetail" in error: if "ErrorCode" in error["ErrorDetail"]: if error["ErrorDetail"]["ErrorCode"] != "AlreadyExistsException": raise exceptions.ServiceApiError(str(res["Errors"])) @apply_configs def add_csv_partitions( database: str, table: str, partitions_values: dict[str, list[str]], bucketing_info: typing.BucketingInfoTuple | None = None, catalog_id: str | None = None, compression: str | None = None, sep: str = ",", serde_library: str | None = None, serde_parameters: dict[str, str] | None = None, boto3_session: boto3.Session | None = None, columns_types: dict[str, str] | None = None, partitions_parameters: dict[str, str] | None = None, ) -> None: r"""Add partitions (metadata) to a CSV Table in the AWS Glue Catalog. Parameters ---------- database Database name. table Table name. partitions_values Dictionary with keys as S3 path locations and values as a list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}). bucketing_info Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the second element. Only `str`, `int` and `bool` are supported as column data types for bucketing. catalog_id The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. compression Compression style (``None``, ``gzip``, etc). sep String of length 1. Field delimiter for the output file. serde_library Specifies the SerDe Serialization library which will be used. You need to provide the Class library name as a string. If no library is provided the default is `org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe`. serde_parameters Dictionary of initialization parameters for the SerDe. The default is `{"field.delim": sep, "escape.delim": "\\"}`. boto3_session The default boto3 session will be used if boto3_session receive None. columns_types Only required for Hive compability. Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}). P.S. Only materialized columns please, not partition columns. partitions_parameters Dictionary with key-value pairs defining partition parameters. Examples -------- >>> import awswrangler as wr >>> wr.catalog.add_csv_partitions( ... database='default', ... table='my_table', ... partitions_values={ ... 's3://bucket/prefix/y=2020/m=10/': ['2020', '10'], ... 's3://bucket/prefix/y=2020/m=11/': ['2020', '11'], ... 's3://bucket/prefix/y=2020/m=12/': ['2020', '12'] ... } ... ) """ table = sanitize_table_name(table=table) inputs: list[dict[str, Any]] = [ _csv_partition_definition( location=k, values=v, bucketing_info=bucketing_info, compression=compression, sep=sep, columns_types=columns_types, serde_library=serde_library, serde_parameters=serde_parameters, partitions_parameters=partitions_parameters, ) for k, v in partitions_values.items() ] _add_partitions(database=database, table=table, boto3_session=boto3_session, inputs=inputs, catalog_id=catalog_id) @apply_configs def add_json_partitions( database: str, table: str, partitions_values: dict[str, list[str]], bucketing_info: typing.BucketingInfoTuple | None = None, catalog_id: str | None = None, compression: str | None = None, serde_library: str | None = None, serde_parameters: dict[str, str] | None = None, boto3_session: boto3.Session | None = None, columns_types: dict[str, str] | None = None, partitions_parameters: dict[str, str] | None = None, ) -> None: r"""Add partitions (metadata) to a JSON Table in the AWS Glue Catalog. Parameters ---------- database Database name. table Table name. partitions_values Dictionary with keys as S3 path locations and values as a list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}). bucketing_info Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the second element. Only `str`, `int` and `bool` are supported as column data types for bucketing. catalog_id The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. compression Compression style (``None``, ``gzip``, etc). serde_library Specifies the SerDe Serialization library which will be used. You need to provide the Class library name as a string. If no library is provided the default is `org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe`. serde_parameters Dictionary of initialization parameters for the SerDe. The default is `{"field.delim": sep, "escape.delim": "\\"}`. boto3_session Boto3 Session. The default boto3 session will be used if boto3_session receive None. columns_types Only required for Hive compability. Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}). P.S. Only materialized columns please, not partition columns. partitions_parameters Dictionary with key-value pairs defining partition parameters. Examples -------- >>> import awswrangler as wr >>> wr.catalog.add_json_partitions( ... database='default', ... table='my_table', ... partitions_values={ ... 's3://bucket/prefix/y=2020/m=10/': ['2020', '10'], ... 's3://bucket/prefix/y=2020/m=11/': ['2020', '11'], ... 's3://bucket/prefix/y=2020/m=12/': ['2020', '12'] ... } ... ) """ table = sanitize_table_name(table=table) inputs: list[dict[str, Any]] = [ _json_partition_definition( location=k, values=v, bucketing_info=bucketing_info, compression=compression, columns_types=columns_types, serde_library=serde_library, serde_parameters=serde_parameters, partitions_parameters=partitions_parameters, ) for k, v in partitions_values.items() ] _add_partitions(database=database, table=table, boto3_session=boto3_session, inputs=inputs, catalog_id=catalog_id) @apply_configs def add_parquet_partitions( database: str, table: str, partitions_values: dict[str, list[str]], bucketing_info: typing.BucketingInfoTuple | None = None, catalog_id: str | None = None, compression: str | None = None, boto3_session: boto3.Session | None = None, columns_types: dict[str, str] | None = None, partitions_parameters: dict[str, str] | None = None, ) -> None: """Add partitions (metadata) to a Parquet Table in the AWS Glue Catalog. Parameters ---------- database Database name. table Table name. partitions_values Dictionary with keys as S3 path locations and values as a list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}). bucketing_info Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the second element. Only `str`, `int` and `bool` are supported as column data types for bucketing. catalog_id The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. compression Compression style (``None``, ``snappy``, ``gzip``, etc). boto3_session Boto3 Session. The default boto3 session will be used if boto3_session receive None. columns_types Only required for Hive compability. Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}). P.S. Only materialized columns please, not partition columns. partitions_parameters Dictionary with key-value pairs defining partition parameters. Examples -------- >>> import awswrangler as wr >>> wr.catalog.add_parquet_partitions( ... database='default', ... table='my_table', ... partitions_values={ ... 's3://bucket/prefix/y=2020/m=10/': ['2020', '10'], ... 's3://bucket/prefix/y=2020/m=11/': ['2020', '11'], ... 's3://bucket/prefix/y=2020/m=12/': ['2020', '12'] ... } ... ) """ table = sanitize_table_name(table=table) if partitions_values: inputs: list[dict[str, Any]] = [ _parquet_partition_definition( location=k, values=v, bucketing_info=bucketing_info, compression=compression, columns_types=columns_types, partitions_parameters=partitions_parameters, ) for k, v in partitions_values.items() ] _add_partitions( database=database, table=table, boto3_session=boto3_session, inputs=inputs, catalog_id=catalog_id ) @apply_configs def add_orc_partitions( database: str, table: str, partitions_values: dict[str, list[str]], bucketing_info: typing.BucketingInfoTuple | None = None, catalog_id: str | None = None, compression: str | None = None, boto3_session: boto3.Session | None = None, columns_types: dict[str, str] | None = None, partitions_parameters: dict[str, str] | None = None, ) -> None: """Add partitions (metadata) to a ORC Table in the AWS Glue Catalog. Parameters ---------- database Database name. table Table name. partitions_values Dictionary with keys as S3 path locations and values as a list of partitions values as str (e.g. {'s3://bucket/prefix/y=2020/m=10/': ['2020', '10']}). bucketing_info Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the second element. Only `str`, `int` and `bool` are supported as column data types for bucketing. catalog_id The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. compression Compression style (``None``, ``snappy``, ``zlib``, etc). boto3_session Boto3 Session. The default boto3 session will be used if boto3_session receive None. columns_types Only required for Hive compability. Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}). P.S. Only materialized columns please, not partition columns. partitions_parameters Dictionary with key-value pairs defining partition parameters. Examples -------- >>> import awswrangler as wr >>> wr.catalog.add_orc_partitions( ... database='default', ... table='my_table', ... partitions_values={ ... 's3://bucket/prefix/y=2020/m=10/': ['2020', '10'], ... 's3://bucket/prefix/y=2020/m=11/': ['2020', '11'], ... 's3://bucket/prefix/y=2020/m=12/': ['2020', '12'] ... } ... ) """ table = sanitize_table_name(table=table) if partitions_values: inputs: list[dict[str, Any]] = [ _orc_partition_definition( location=k, values=v, bucketing_info=bucketing_info, compression=compression, columns_types=columns_types, partitions_parameters=partitions_parameters, ) for k, v in partitions_values.items() ] _add_partitions( database=database, table=table, boto3_session=boto3_session, inputs=inputs, catalog_id=catalog_id ) @apply_configs def add_column( database: str, table: str, column_name: str, column_type: str = "string", column_comment: str | None = None, boto3_session: boto3.Session | None = None, catalog_id: str | None = None, ) -> None: """Add a column in a AWS Glue Catalog table. Parameters ---------- database Database name. table Table name. column_name Column name column_type Column type. column_comment Column Comment boto3_session The default boto3 session will be used if **boto3_session** is ``None``. catalog_id The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. Examples -------- >>> import awswrangler as wr >>> wr.catalog.add_column( ... database='my_db', ... table='my_table', ... column_name='my_col', ... column_type='int' ... ) """ if _check_column_type(column_type): client_glue = _utils.client(service_name="glue", session=boto3_session) table_res = client_glue.get_table(**_catalog_id(catalog_id=catalog_id, DatabaseName=database, Name=table)) table_input: dict[str, Any] = _update_table_definition(table_res) table_input["StorageDescriptor"]["Columns"].append( {"Name": column_name, "Type": column_type, "Comment": column_comment} ) res: dict[str, Any] = client_glue.update_table( **_catalog_id(catalog_id=catalog_id, DatabaseName=database, TableInput=table_input) ) if ("Errors" in res) and res["Errors"]: for error in res["Errors"]: if "ErrorDetail" in error: if "ErrorCode" in error["ErrorDetail"]: raise exceptions.ServiceApiError(str(res["Errors"]))