in pyiceberg/io/pyarrow.py [0:0]
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
)
location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties)
def write_parquet(task: WriteTask) -> DataFile:
table_schema = table_metadata.schema()
# if schema needs to be transformed, use the transformed schema and adjust the arrow table accordingly
# otherwise use the original schema
if (sanitized_schema := sanitize_column_names(table_schema)) != table_schema:
file_schema = sanitized_schema
else:
file_schema = table_schema
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
batches = [
_to_requested_schema(
requested_schema=file_schema,
file_schema=task.schema,
batch=batch,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
include_field_ids=True,
)
for batch in task.record_batches
]
arrow_table = pa.Table.from_batches(batches)
file_path = location_provider.new_data_location(
data_file_name=task.generate_data_file_filename("parquet"),
partition_key=task.partition_key,
)
fo = io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_table.schema, **parquet_writer_kwargs) as writer:
writer.write(arrow_table, row_group_size=row_group_size)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(file_schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
)
data_file = DataFile.from_args(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=task.partition_key.partition if task.partition_key else Record(),
file_size_in_bytes=len(fo),
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# Just copy these from the table for now
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
)
return data_file
executor = ExecutorFactory.get_or_create()
data_files = executor.map(write_parquet, tasks)
return iter(data_files)