in src/dataframe.rs [621:674]
fn write_parquet(
&self,
path: &str,
compression: &str,
compression_level: Option<u32>,
py: Python,
) -> PyDataFusionResult<()> {
fn verify_compression_level(cl: Option<u32>) -> Result<u32, PyErr> {
cl.ok_or(PyValueError::new_err("compression_level is not defined"))
}
let _validated = match compression.to_lowercase().as_str() {
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP(
GzipLevel::try_new(compression_level.unwrap_or(6))
.map_err(|e| PyValueError::new_err(format!("{e}")))?,
),
"brotli" => Compression::BROTLI(
BrotliLevel::try_new(verify_compression_level(compression_level)?)
.map_err(|e| PyValueError::new_err(format!("{e}")))?,
),
"zstd" => Compression::ZSTD(
ZstdLevel::try_new(verify_compression_level(compression_level)? as i32)
.map_err(|e| PyValueError::new_err(format!("{e}")))?,
),
"lzo" => Compression::LZO,
"lz4" => Compression::LZ4,
"lz4_raw" => Compression::LZ4_RAW,
"uncompressed" => Compression::UNCOMPRESSED,
_ => {
return Err(PyDataFusionError::Common(format!(
"Unrecognized compression type {compression}"
)));
}
};
let mut compression_string = compression.to_string();
if let Some(level) = compression_level {
compression_string.push_str(&format!("({level})"));
}
let mut options = TableParquetOptions::default();
options.global.compression = Some(compression_string);
wait_for_future(
py,
self.df.as_ref().clone().write_parquet(
path,
DataFrameWriteOptions::new(),
Option::from(options),
),
)?;
Ok(())
}