in smallpond/execution/task.py [0:0]
def _create_file_writer(self, partition_idx: int, schema: arrow.Schema):
partition_filename = f"{self.output_filename}-{partition_idx}.parquet"
partition_path = os.path.join(self.runtime_output_abspath, partition_filename)
self._partition_files[partition_idx] = open(partition_path, "wb", buffering=self.write_buffer_size)
output_file = self._partition_files[partition_idx]
self.partitioned_datasets[partition_idx].paths.append(partition_filename)
self._partition_writers[partition_idx] = parquet.ParquetWriter(
where=output_file,
schema=schema.with_metadata(
self.parquet_kv_metadata_bytes(
[
PartitionInfo(partition_idx, self.npartitions, self.dimension),
PartitionInfo(partition_idx, self.npartitions, self.data_partition_column),
]
)
),
use_dictionary=self.parquet_dictionary_encoding,
compression=(self.parquet_compression if self.parquet_compression is not None else "NONE"),
compression_level=self.parquet_compression_level,
write_batch_size=max(16 * 1024, self.parquet_row_group_size // 8),
data_page_size=max(64 * MB, self.parquet_row_group_bytes // 8),
)
return self._partition_writers[partition_idx]