in smallpond/execution/task.py [0:0]
def _create_empty_file(self, partition_idx: int, dataset: DataSet) -> str:
"""
Create an empty file for a partition according to the schema of the dataset.
Return the path relative to the output directory.
"""
if isinstance(self, HashPartitionDuckDbTask) and self.hive_partitioning:
empty_file_prefix = os.path.join(
self.runtime_output_abspath,
f"{self.data_partition_column}={partition_idx}",
f"{self.output_filename}-{partition_idx}-empty",
)
Path(empty_file_prefix).parent.mkdir(exist_ok=True)
else:
empty_file_prefix = os.path.join(
self.runtime_output_abspath,
f"{self.output_filename}-{partition_idx}-empty",
)
if isinstance(dataset, CsvDataSet):
empty_file_path = Path(empty_file_prefix + ".csv")
empty_file_path.touch()
elif isinstance(dataset, JsonDataSet):
empty_file_path = Path(empty_file_prefix + ".json")
empty_file_path.touch()
elif isinstance(dataset, ParquetDataSet):
with duckdb.connect(database=":memory:") as conn:
conn.sql(f"SET threads TO 1")
dataset_schema = dataset.to_batch_reader(batch_size=1, conn=conn).schema
extra_partitions = (
[PartitionInfo(partition_idx, self.npartitions, self.dimension)]
if not isinstance(self, HashPartitionTask)
else [
PartitionInfo(partition_idx, self.npartitions, self.dimension),
PartitionInfo(partition_idx, self.npartitions, self.data_partition_column),
]
)
schema_with_metadata = filter_schema(dataset_schema, excluded_cols=GENERATED_COLUMNS).with_metadata(
self.parquet_kv_metadata_bytes(extra_partitions)
)
empty_file_path = Path(empty_file_prefix + ".parquet")
parquet.ParquetWriter(empty_file_path, schema_with_metadata).close()
else:
raise ValueError(f"unsupported dataset type: {type(dataset)}")
return str(empty_file_path.relative_to(self.runtime_output_abspath))