def _create_empty_file()

in smallpond/execution/task.py [0:0]


    def _create_empty_file(self, partition_idx: int, dataset: DataSet) -> str:
        """
        Create an empty file for a partition according to the schema of the dataset.
        Return the path relative to the output directory.
        """
        if isinstance(self, HashPartitionDuckDbTask) and self.hive_partitioning:
            empty_file_prefix = os.path.join(
                self.runtime_output_abspath,
                f"{self.data_partition_column}={partition_idx}",
                f"{self.output_filename}-{partition_idx}-empty",
            )
            Path(empty_file_prefix).parent.mkdir(exist_ok=True)
        else:
            empty_file_prefix = os.path.join(
                self.runtime_output_abspath,
                f"{self.output_filename}-{partition_idx}-empty",
            )

        if isinstance(dataset, CsvDataSet):
            empty_file_path = Path(empty_file_prefix + ".csv")
            empty_file_path.touch()
        elif isinstance(dataset, JsonDataSet):
            empty_file_path = Path(empty_file_prefix + ".json")
            empty_file_path.touch()
        elif isinstance(dataset, ParquetDataSet):
            with duckdb.connect(database=":memory:") as conn:
                conn.sql(f"SET threads TO 1")
                dataset_schema = dataset.to_batch_reader(batch_size=1, conn=conn).schema
            extra_partitions = (
                [PartitionInfo(partition_idx, self.npartitions, self.dimension)]
                if not isinstance(self, HashPartitionTask)
                else [
                    PartitionInfo(partition_idx, self.npartitions, self.dimension),
                    PartitionInfo(partition_idx, self.npartitions, self.data_partition_column),
                ]
            )
            schema_with_metadata = filter_schema(dataset_schema, excluded_cols=GENERATED_COLUMNS).with_metadata(
                self.parquet_kv_metadata_bytes(extra_partitions)
            )
            empty_file_path = Path(empty_file_prefix + ".parquet")
            parquet.ParquetWriter(empty_file_path, schema_with_metadata).close()
        else:
            raise ValueError(f"unsupported dataset type: {type(dataset)}")

        return str(empty_file_path.relative_to(self.runtime_output_abspath))