pyiceberg/catalog/dynamodb.py (546 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import uuid from time import time from typing import ( TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union, ) import boto3 from pyiceberg.catalog import ( BOTOCORE_SESSION, ICEBERG, METADATA_LOCATION, PREVIOUS_METADATA_LOCATION, TABLE_TYPE, MetastoreCatalog, PropertiesUpdateSummary, ) from pyiceberg.exceptions import ( ConditionalCheckFailedException, GenericDynamoDbError, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchIcebergTableError, NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError, TableAlreadyExistsError, ) from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile from pyiceberg.table import CommitTableResponse, Table from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( TableRequirement, TableUpdate, ) from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties from pyiceberg.utils.properties import get_first_property_value if TYPE_CHECKING: import pyarrow as pa DYNAMODB_CLIENT = "dynamodb" DYNAMODB_COL_IDENTIFIER = "identifier" DYNAMODB_COL_NAMESPACE = "namespace" DYNAMODB_COL_VERSION = "v" DYNAMODB_COL_UPDATED_AT = "updated_at" DYNAMODB_COL_CREATED_AT = "created_at" DYNAMODB_NAMESPACE = "NAMESPACE" DYNAMODB_NAMESPACE_GSI = "namespace-identifier" DYNAMODB_PAY_PER_REQUEST = "PAY_PER_REQUEST" DYNAMODB_TABLE_NAME = "table-name" DYNAMODB_TABLE_NAME_DEFAULT = "iceberg" PROPERTY_KEY_PREFIX = "p." ACTIVE = "ACTIVE" ITEM = "Item" DYNAMODB_PROFILE_NAME = "dynamodb.profile-name" DYNAMODB_REGION = "dynamodb.region" DYNAMODB_ACCESS_KEY_ID = "dynamodb.access-key-id" DYNAMODB_SECRET_ACCESS_KEY = "dynamodb.secret-access-key" DYNAMODB_SESSION_TOKEN = "dynamodb.session-token" class DynamoDbCatalog(MetastoreCatalog): def __init__(self, name: str, **properties: str): super().__init__(name, **properties) session = boto3.Session( profile_name=properties.get(DYNAMODB_PROFILE_NAME), region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION), botocore_session=properties.get(BOTOCORE_SESSION), aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN), ) self.dynamodb = session.client(DYNAMODB_CLIENT) self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT) self._ensure_catalog_table_exists_or_create() def _ensure_catalog_table_exists_or_create(self) -> None: if self._dynamodb_table_exists(): return None try: self.dynamodb.create_table( TableName=self.dynamodb_table_name, AttributeDefinitions=CREATE_CATALOG_ATTRIBUTE_DEFINITIONS, KeySchema=CREATE_CATALOG_KEY_SCHEMA, GlobalSecondaryIndexes=CREATE_CATALOG_GLOBAL_SECONDARY_INDEXES, BillingMode=DYNAMODB_PAY_PER_REQUEST, ) except ( self.dynamodb.exceptions.ResourceInUseException, self.dynamodb.exceptions.LimitExceededException, self.dynamodb.exceptions.InternalServerError, ) as e: raise GenericDynamoDbError(e.message) from e def _dynamodb_table_exists(self) -> bool: try: response = self.dynamodb.describe_table(TableName=self.dynamodb_table_name) except self.dynamodb.exceptions.ResourceNotFoundException: return False except self.dynamodb.exceptions.InternalServerError as e: raise GenericDynamoDbError(e.message) from e if response["Table"]["TableStatus"] != ACTIVE: raise GenericDynamoDbError(f"DynamoDB table for catalog {self.dynamodb_table_name} is not {ACTIVE}") else: return True def create_table( self, identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, ) -> Table: """ Create an Iceberg table. Args: identifier: Table identifier. schema: Table's schema. location: Location for the table. Optional Argument. partition_spec: PartitionSpec for the table. sort_order: SortOrder for the table. properties: Table properties that can be a string based dictionary. Returns: Table: the created table instance. Raises: AlreadyExistsError: If a table with the name already exists. ValueError: If the identifier is invalid, or no path is given to store metadata. """ schema: Schema = self._convert_schema_if_needed(schema) # type: ignore database_name, table_name = self.identifier_to_database_and_table(identifier) location = self._resolve_table_location(location, database_name, table_name) provider = load_location_provider(table_location=location, table_properties=properties) metadata_location = provider.new_table_metadata_file_location() metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) io = load_file_io(properties=self.properties, location=metadata_location) self._write_metadata(metadata, io, metadata_location) self._ensure_namespace_exists(database_name=database_name) try: self._put_dynamo_item( item=_get_create_table_item( database_name=database_name, table_name=table_name, properties=properties, metadata_location=metadata_location ), condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})", ) except ConditionalCheckFailedException as e: raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e return self.load_table(identifier=identifier) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. Args: identifier Union[str, Identifier]: Table identifier for the table metadata_location str: The location to the metadata Returns: Table: The newly registered table Raises: TableAlreadyExistsError: If the table already exists """ raise NotImplementedError def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: """Commit updates to a table. Args: table (Table): The table to be updated. requirements: (Tuple[TableRequirement, ...]): Table requirements. updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. Raises: NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ raise NotImplementedError def load_table(self, identifier: Union[str, Identifier]) -> Table: """ Load the table's metadata and returns the table instance. You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'. Note: This method doesn't scan data stored in the table. Args: identifier: Table identifier. Returns: Table: the table instance with its metadata. Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name) return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item) def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. Args: identifier: Table identifier. Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) try: self._delete_dynamo_item( namespace=database_name, identifier=f"{database_name}.{table_name}", condition_expression=f"attribute_exists({DYNAMODB_COL_IDENTIFIER})", ) except ConditionalCheckFailedException as e: raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: """Rename a fully classified table name. This method can only rename Iceberg tables in AWS Glue. Args: from_identifier: Existing table identifier. to_identifier: New table identifier. Returns: Table: the updated table instance with its metadata. Raises: ValueError: When from table identifier is invalid. NoSuchTableError: When a table with the name does not exist. NoSuchIcebergTableError: When from table is not a valid iceberg table. NoSuchPropertyException: When from table miss some required properties. NoSuchNamespaceError: When the destination namespace doesn't exist. """ from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError) to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier) from_table_item = self._get_iceberg_table_item(database_name=from_database_name, table_name=from_table_name) try: # Verify that from_identifier is a valid iceberg table self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=from_table_item) except NoSuchPropertyException as e: raise NoSuchPropertyException( f"Failed to rename table {from_database_name}.{from_table_name} since it is missing required properties" ) from e except NoSuchIcebergTableError as e: raise NoSuchIcebergTableError( f"Failed to rename table {from_database_name}.{from_table_name} since it is not a valid iceberg table" ) from e self._ensure_namespace_exists(database_name=from_database_name) self._ensure_namespace_exists(database_name=to_database_name) try: self._put_dynamo_item( item=_get_rename_table_item( from_dynamo_table_item=from_table_item, to_database_name=to_database_name, to_table_name=to_table_name ), condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})", ) except ConditionalCheckFailedException as e: raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e try: self.drop_table(from_identifier) except (NoSuchTableError, GenericDynamoDbError) as e: log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. " try: self.drop_table(to_identifier) log_message += f"Rolled back table creation for {to_database_name}.{to_table_name}." except (NoSuchTableError, GenericDynamoDbError): log_message += ( f"Failed to roll back table creation for {to_database_name}.{to_table_name}. Please clean up manually" ) raise ValueError(log_message) from e return self.load_table(to_identifier) def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. Args: namespace: Namespace identifier. properties: A string dictionary of properties for the given namespace. Raises: ValueError: If the identifier is invalid. AlreadyExistsError: If a namespace with the given name already exists. """ database_name = self.identifier_to_database(namespace) try: self._put_dynamo_item( item=_get_create_database_item(database_name=database_name, properties=properties), condition_expression=f"attribute_not_exists({DYNAMODB_COL_NAMESPACE})", ) except ConditionalCheckFailedException as e: raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e def drop_namespace(self, namespace: Union[str, Identifier]) -> None: """Drop a namespace. A Glue namespace can only be dropped if it is empty. Args: namespace: Namespace identifier. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. NamespaceNotEmptyError: If the namespace is not empty. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) table_identifiers = self.list_tables(namespace=database_name) if len(table_identifiers) > 0: raise NamespaceNotEmptyError(f"Database {database_name} is not empty") try: self._delete_dynamo_item( namespace=database_name, identifier=DYNAMODB_NAMESPACE, condition_expression=f"attribute_exists({DYNAMODB_COL_IDENTIFIER})", ) except ConditionalCheckFailedException as e: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: """List Iceberg tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: List[Identifier]: list of table identifiers. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) paginator = self.dynamodb.get_paginator("query") try: page_iterator = paginator.paginate( TableName=self.dynamodb_table_name, IndexName=DYNAMODB_NAMESPACE_GSI, KeyConditionExpression=f"{DYNAMODB_COL_NAMESPACE} = :namespace ", ExpressionAttributeValues={ ":namespace": { "S": database_name, } }, ) except ( self.dynamodb.exceptions.ProvisionedThroughputExceededException, self.dynamodb.exceptions.RequestLimitExceeded, self.dynamodb.exceptions.InternalServerError, self.dynamodb.exceptions.ResourceNotFoundException, ) as e: raise GenericDynamoDbError(e.message) from e table_identifiers = [] for page in page_iterator: for item in page["Items"]: _dict = _convert_dynamo_item_to_regular_dict(item) identifier_col = _dict[DYNAMODB_COL_IDENTIFIER] if identifier_col == DYNAMODB_NAMESPACE: continue table_identifiers.append(self.identifier_to_tuple(identifier_col)) return table_identifiers def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: """List top-level namespaces from the catalog. We do not support hierarchical namespace. Returns: List[Identifier]: a List of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: return [] paginator = self.dynamodb.get_paginator("query") try: page_iterator = paginator.paginate( TableName=self.dynamodb_table_name, ConsistentRead=True, KeyConditionExpression=f"{DYNAMODB_COL_IDENTIFIER} = :identifier", ExpressionAttributeValues={ ":identifier": { "S": DYNAMODB_NAMESPACE, } }, ) except ( self.dynamodb.exceptions.ProvisionedThroughputExceededException, self.dynamodb.exceptions.RequestLimitExceeded, self.dynamodb.exceptions.InternalServerError, self.dynamodb.exceptions.ResourceNotFoundException, ) as e: raise GenericDynamoDbError(e.message) from e database_identifiers = [] for page in page_iterator: for item in page["Items"]: _dict = _convert_dynamo_item_to_regular_dict(item) namespace_col = _dict[DYNAMODB_COL_NAMESPACE] database_identifiers.append(self.identifier_to_tuple(namespace_col)) return database_identifiers def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """ Get properties for a namespace. Args: namespace: Namespace identifier. Returns: Properties: Properties for the given namespace. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) namespace_item = self._get_iceberg_namespace_item(database_name=database_name) namespace_dict = _convert_dynamo_item_to_regular_dict(namespace_item) return _get_namespace_properties(namespace_dict=namespace_dict) def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: """ Remove or update provided property keys for a namespace. Args: namespace: Namespace identifier removals: Set of property keys that need to be removed. Optional Argument. updates: Properties to be updated for the given namespace. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid. ValueError: If removals and updates have overlapping keys. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) namespace_item = self._get_iceberg_namespace_item(database_name=database_name) namespace_dict = _convert_dynamo_item_to_regular_dict(namespace_item) current_properties = _get_namespace_properties(namespace_dict=namespace_dict) properties_update_summary, updated_properties = self._get_updated_props_and_update_summary( current_properties=current_properties, removals=removals, updates=updates ) try: self._put_dynamo_item( item=_get_update_database_item( namespace_item=namespace_item, updated_properties=updated_properties, ), condition_expression=f"attribute_exists({DYNAMODB_COL_NAMESPACE})", ) except ConditionalCheckFailedException as e: raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e return properties_update_summary def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError def view_exists(self, identifier: Union[str, Identifier]) -> bool: raise NotImplementedError def _get_iceberg_table_item(self, database_name: str, table_name: str) -> Dict[str, Any]: try: return self._get_dynamo_item(identifier=f"{database_name}.{table_name}", namespace=database_name) except ValueError as e: raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e def _get_iceberg_namespace_item(self, database_name: str) -> Dict[str, Any]: try: return self._get_dynamo_item(identifier=DYNAMODB_NAMESPACE, namespace=database_name) except ValueError as e: raise NoSuchNamespaceError(f"Namespace does not exist: {database_name}") from e def _ensure_namespace_exists(self, database_name: str) -> Dict[str, Any]: return self._get_iceberg_namespace_item(database_name) def _get_dynamo_item(self, identifier: str, namespace: str) -> Dict[str, Any]: try: response = self.dynamodb.get_item( TableName=self.dynamodb_table_name, ConsistentRead=True, Key={ DYNAMODB_COL_IDENTIFIER: { "S": identifier, }, DYNAMODB_COL_NAMESPACE: { "S": namespace, }, }, ) if ITEM in response: return response[ITEM] else: raise ValueError(f"Item not found. identifier: {identifier} - namespace: {namespace}") except self.dynamodb.exceptions.ResourceNotFoundException as e: raise ValueError(f"Item not found. identifier: {identifier} - namespace: {namespace}") from e except ( self.dynamodb.exceptions.ProvisionedThroughputExceededException, self.dynamodb.exceptions.RequestLimitExceeded, self.dynamodb.exceptions.InternalServerError, ) as e: raise GenericDynamoDbError(e.message) from e def _put_dynamo_item(self, item: Dict[str, Any], condition_expression: str) -> None: try: self.dynamodb.put_item(TableName=self.dynamodb_table_name, Item=item, ConditionExpression=condition_expression) except self.dynamodb.exceptions.ConditionalCheckFailedException as e: raise ConditionalCheckFailedException(f"Condition expression check failed: {condition_expression} - {item}") from e except ( self.dynamodb.exceptions.ProvisionedThroughputExceededException, self.dynamodb.exceptions.RequestLimitExceeded, self.dynamodb.exceptions.InternalServerError, self.dynamodb.exceptions.ResourceNotFoundException, self.dynamodb.exceptions.ItemCollectionSizeLimitExceededException, self.dynamodb.exceptions.TransactionConflictException, ) as e: raise GenericDynamoDbError(e.message) from e def _delete_dynamo_item(self, namespace: str, identifier: str, condition_expression: str) -> None: try: self.dynamodb.delete_item( TableName=self.dynamodb_table_name, Key={ DYNAMODB_COL_IDENTIFIER: { "S": identifier, }, DYNAMODB_COL_NAMESPACE: { "S": namespace, }, }, ConditionExpression=condition_expression, ) except self.dynamodb.exceptions.ConditionalCheckFailedException as e: raise ConditionalCheckFailedException( f"Condition expression check failed: {condition_expression} - {identifier}" ) from e except ( self.dynamodb.exceptions.ProvisionedThroughputExceededException, self.dynamodb.exceptions.RequestLimitExceeded, self.dynamodb.exceptions.InternalServerError, self.dynamodb.exceptions.ResourceNotFoundException, self.dynamodb.exceptions.ItemCollectionSizeLimitExceededException, self.dynamodb.exceptions.TransactionConflictException, ) as e: raise GenericDynamoDbError(e.message) from e def _convert_dynamo_table_item_to_iceberg_table(self, dynamo_table_item: Dict[str, Any]) -> Table: table_dict = _convert_dynamo_item_to_regular_dict(dynamo_table_item) for prop in [_add_property_prefix(prop) for prop in (TABLE_TYPE, METADATA_LOCATION)] + [ DYNAMODB_COL_IDENTIFIER, DYNAMODB_COL_NAMESPACE, DYNAMODB_COL_CREATED_AT, ]: if prop not in table_dict.keys(): raise NoSuchPropertyException(f"Iceberg required property {prop} is missing: {dynamo_table_item}") table_type = table_dict[_add_property_prefix(TABLE_TYPE)] identifier = table_dict[DYNAMODB_COL_IDENTIFIER] metadata_location = table_dict[_add_property_prefix(METADATA_LOCATION)] database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) if table_type.lower() != ICEBERG: raise NoSuchIcebergTableError( f"Property table_type is {table_type}, expected {ICEBERG}: {database_name}.{table_name}" ) io = load_file_io(properties=self.properties, location=metadata_location) file = io.new_input(metadata_location) metadata = FromInputFile.table_metadata(file) return Table( identifier=(database_name, table_name), metadata=metadata, metadata_location=metadata_location, io=self._load_file_io(metadata.properties, metadata_location), catalog=self, ) def _get_create_table_item(database_name: str, table_name: str, properties: Properties, metadata_location: str) -> Dict[str, Any]: current_timestamp_ms = str(round(time() * 1000)) _dict = { DYNAMODB_COL_IDENTIFIER: { "S": f"{database_name}.{table_name}", }, DYNAMODB_COL_NAMESPACE: { "S": database_name, }, DYNAMODB_COL_VERSION: { "S": str(uuid.uuid4()), }, DYNAMODB_COL_CREATED_AT: { "N": current_timestamp_ms, }, DYNAMODB_COL_UPDATED_AT: { "N": current_timestamp_ms, }, } for key, val in properties.items(): _dict[_add_property_prefix(key)] = {"S": val} _dict[_add_property_prefix(TABLE_TYPE)] = {"S": ICEBERG.upper()} _dict[_add_property_prefix(METADATA_LOCATION)] = {"S": metadata_location} _dict[_add_property_prefix(PREVIOUS_METADATA_LOCATION)] = {"S": ""} return _dict def _get_rename_table_item(from_dynamo_table_item: Dict[str, Any], to_database_name: str, to_table_name: str) -> Dict[str, Any]: _dict = from_dynamo_table_item current_timestamp_ms = str(round(time() * 1000)) _dict[DYNAMODB_COL_IDENTIFIER]["S"] = f"{to_database_name}.{to_table_name}" _dict[DYNAMODB_COL_NAMESPACE]["S"] = to_database_name _dict[DYNAMODB_COL_VERSION]["S"] = str(uuid.uuid4()) _dict[DYNAMODB_COL_UPDATED_AT]["N"] = current_timestamp_ms return _dict def _get_create_database_item(database_name: str, properties: Properties) -> Dict[str, Any]: current_timestamp_ms = str(round(time() * 1000)) _dict = { DYNAMODB_COL_IDENTIFIER: { "S": DYNAMODB_NAMESPACE, }, DYNAMODB_COL_NAMESPACE: { "S": database_name, }, DYNAMODB_COL_VERSION: { "S": str(uuid.uuid4()), }, DYNAMODB_COL_CREATED_AT: { "N": current_timestamp_ms, }, DYNAMODB_COL_UPDATED_AT: { "N": current_timestamp_ms, }, } for key, val in properties.items(): _dict[_add_property_prefix(key)] = {"S": val} return _dict def _get_update_database_item(namespace_item: Dict[str, Any], updated_properties: Properties) -> Dict[str, Any]: current_timestamp_ms = str(round(time() * 1000)) _dict = { DYNAMODB_COL_IDENTIFIER: namespace_item[DYNAMODB_COL_IDENTIFIER], DYNAMODB_COL_NAMESPACE: namespace_item[DYNAMODB_COL_NAMESPACE], DYNAMODB_COL_VERSION: { "S": str(uuid.uuid4()), }, DYNAMODB_COL_CREATED_AT: namespace_item[DYNAMODB_COL_CREATED_AT], DYNAMODB_COL_UPDATED_AT: { "N": current_timestamp_ms, }, } for key, val in updated_properties.items(): _dict[_add_property_prefix(key)] = {"S": val} return _dict CREATE_CATALOG_ATTRIBUTE_DEFINITIONS = [ { "AttributeName": DYNAMODB_COL_IDENTIFIER, "AttributeType": "S", }, { "AttributeName": DYNAMODB_COL_NAMESPACE, "AttributeType": "S", }, ] CREATE_CATALOG_KEY_SCHEMA = [ { "AttributeName": DYNAMODB_COL_IDENTIFIER, "KeyType": "HASH", }, { "AttributeName": DYNAMODB_COL_NAMESPACE, "KeyType": "RANGE", }, ] CREATE_CATALOG_GLOBAL_SECONDARY_INDEXES = [ { "IndexName": DYNAMODB_NAMESPACE_GSI, "KeySchema": [ { "AttributeName": DYNAMODB_COL_NAMESPACE, "KeyType": "HASH", }, { "AttributeName": DYNAMODB_COL_IDENTIFIER, "KeyType": "RANGE", }, ], "Projection": { "ProjectionType": "KEYS_ONLY", }, } ] def _get_namespace_properties(namespace_dict: Dict[str, str]) -> Properties: return {_remove_property_prefix(key): val for key, val in namespace_dict.items() if key.startswith(PROPERTY_KEY_PREFIX)} def _convert_dynamo_item_to_regular_dict(dynamo_json: Dict[str, Any]) -> Dict[str, str]: """Convert a dynamo json to a regular json. Example of a dynamo json: { "AlbumTitle": { "S": "Songs About Life", }, "Artist": { "S": "Acme Band", }, "SongTitle": { "S": "Happy Day", } } Converted to regular json: { "AlbumTitle": "Songs About Life", "Artist": "Acme Band", "SongTitle": "Happy Day" } Only "S" and "N" data types are supported since those are the only ones that Iceberg is utilizing. """ regular_json = {} for column_name, val_dict in dynamo_json.items(): keys = list(val_dict.keys()) if len(keys) != 1: raise ValueError(f"Expecting only 1 key: {keys}") data_type = keys[0] if data_type not in ("S", "N"): raise ValueError("Only S and N data types are supported.") values = list(val_dict.values()) assert len(values) == 1 column_value = values[0] regular_json[column_name] = column_value return regular_json def _add_property_prefix(prop: str) -> str: return PROPERTY_KEY_PREFIX + prop def _remove_property_prefix(prop: str) -> str: return prop.lstrip(PROPERTY_KEY_PREFIX)