awswrangler/s3/_write_deltalake.py (66 lines of code) (raw):
"""Amazon S3 Writer Delta Lake Module (PRIVATE)."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Literal
import boto3
import pandas as pd
import pyarrow as pa
from awswrangler import _data_types, _utils
from awswrangler._arrow import _df_to_table
from awswrangler.annotations import Experimental
if TYPE_CHECKING:
try:
import deltalake
except ImportError:
pass
else:
deltalake = _utils.import_optional_dependency("deltalake")
def _set_default_storage_options_kwargs(
boto3_session: boto3.Session | None,
s3_additional_kwargs: dict[str, Any] | None,
s3_allow_unsafe_rename: bool,
lock_dynamodb_table: str | None = None,
) -> dict[str, Any]:
defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=boto3_session).items()}
defaults["AWS_REGION"] = defaults.pop("REGION_NAME")
s3_additional_kwargs = s3_additional_kwargs or {}
s3_lock_arguments = {}
if lock_dynamodb_table:
s3_lock_arguments["AWS_S3_LOCKING_PROVIDER"] = "dynamodb"
s3_lock_arguments["DELTA_DYNAMO_TABLE_NAME"] = lock_dynamodb_table
return {
**defaults,
**s3_additional_kwargs,
**s3_lock_arguments,
"AWS_S3_ALLOW_UNSAFE_RENAME": "TRUE" if s3_allow_unsafe_rename else "FALSE",
}
@_utils.check_optional_dependency(deltalake, "deltalake")
@Experimental
def to_deltalake(
df: pd.DataFrame,
path: str,
index: bool = False,
mode: Literal["error", "append", "overwrite", "ignore"] = "append",
dtype: dict[str, str] | None = None,
partition_cols: list[str] | None = None,
schema_mode: Literal["overwrite"] | None = None,
lock_dynamodb_table: str | None = None,
s3_allow_unsafe_rename: bool = False,
boto3_session: boto3.Session | None = None,
s3_additional_kwargs: dict[str, str] | None = None,
) -> None:
"""Write a DataFrame to S3 as a DeltaLake table.
This function requires the `deltalake package
<https://delta-io.github.io/delta-rs/python>`__.
Parameters
----------
df
`Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
path
S3 path for a directory where the DeltaLake table will be stored.
index
True to store the DataFrame index in file, otherwise False to ignore it.
mode
``append`` (Default), ``overwrite``, ``ignore``, ``error``
dtype
Dictionary of columns names and Athena/Glue types to be casted.
Useful when you have columns with undetermined or mixed data types.
(e.g. ``{'col name':'bigint', 'col2 name': 'int'})``
partition_cols
List of columns to partition the table by. Only required when creating a new table.
schema_mode
If set to "overwrite", allows replacing the schema of the table. Set to "merge" to merge with existing schema.
lock_dynamodb_table
DynamoDB table to use as a locking provider.
A locking mechanism is needed to prevent unsafe concurrent writes to a delta lake directory when writing to S3.
If you don't want to use a locking mechanism, you can choose to set ``s3_allow_unsafe_rename`` to True.
For information on how to set up the lock table,
please check `this page <https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/#dynamodb>`_.
s3_allow_unsafe_rename
Allows using the default S3 backend without support for concurrent writers.
boto3_session
If None, the default boto3 session is used.
pyarrow_additional_kwargs
Forwarded to the Delta Table class for the storage options of the S3 backend.
Examples
--------
Writing a Pandas DataFrame into a DeltaLake table in S3.
>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.s3.to_deltalake(
... df=pd.DataFrame({"col": [1, 2, 3]}),
... path="s3://bucket/prefix/",
... lock_dynamodb_table="my-lock-table",
... )
See Also
--------
deltalake.DeltaTable: Create a DeltaTable instance with the deltalake library.
deltalake.write_deltalake: Write to a DeltaLake table.
"""
dtype = dtype if dtype else {}
schema: pa.Schema = _data_types.pyarrow_schema_from_pandas(df=df, index=index, ignore_cols=None, dtype=dtype)
table: pa.Table = _df_to_table(df, schema, index, dtype)
storage_options = _set_default_storage_options_kwargs(
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
s3_allow_unsafe_rename=s3_allow_unsafe_rename,
lock_dynamodb_table=lock_dynamodb_table,
)
deltalake.write_deltalake(
table_or_uri=path,
data=table,
partition_by=partition_cols,
mode=mode,
schema_mode=schema_mode,
storage_options=storage_options,
)