native/core/src/parquet/parquet_support.rs (281 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::execution::operators::ExecutionError;
use arrow::{
array::{
cast::AsArray, new_null_array, types::Int32Type, types::TimestampMicrosecondType, Array,
ArrayRef, DictionaryArray, StructArray,
},
compute::{cast_with_options, take, CastOptions},
datatypes::{DataType, TimeUnit},
util::display::FormatOptions,
};
use datafusion::common::{Result as DataFusionResult, ScalarValue};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::ColumnarValue;
use datafusion_comet_spark_expr::EvalMode;
use object_store::path::Path;
use object_store::{parse_url, ObjectStore};
use std::collections::HashMap;
use std::{fmt::Debug, hash::Hash, sync::Arc};
use url::Url;
static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f");
static PARQUET_OPTIONS: CastOptions = CastOptions {
safe: true,
format_options: FormatOptions::new()
.with_timestamp_tz_format(TIMESTAMP_FORMAT)
.with_timestamp_format(TIMESTAMP_FORMAT),
};
/// Spark cast options
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct SparkParquetOptions {
/// Spark evaluation mode
pub eval_mode: EvalMode,
/// When cast from/to timezone related types, we need timezone, which will be resolved with
/// session local timezone by an analyzer in Spark.
// TODO we should change timezone to Tz to avoid repeated parsing
pub timezone: String,
/// Allow casts that are supported but not guaranteed to be 100% compatible
pub allow_incompat: bool,
/// Support casting unsigned ints to signed ints (used by Parquet SchemaAdapter)
pub allow_cast_unsigned_ints: bool,
/// We also use the cast logic for adapting Parquet schemas, so this flag is used
/// for that use case
pub is_adapting_schema: bool,
/// Whether to always represent decimals using 128 bits. If false, the native reader may represent decimals using 32 or 64 bits, depending on the precision.
pub use_decimal_128: bool,
/// Whether to read dates/timestamps that were written in the legacy hybrid Julian + Gregorian calendar as it is. If false, throw exceptions instead. If the spark type is TimestampNTZ, this should be true.
pub use_legacy_date_timestamp_or_ntz: bool,
// Whether schema field names are case sensitive
pub case_sensitive: bool,
}
impl SparkParquetOptions {
pub fn new(eval_mode: EvalMode, timezone: &str, allow_incompat: bool) -> Self {
Self {
eval_mode,
timezone: timezone.to_string(),
allow_incompat,
allow_cast_unsigned_ints: false,
is_adapting_schema: false,
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
}
}
pub fn new_without_timezone(eval_mode: EvalMode, allow_incompat: bool) -> Self {
Self {
eval_mode,
timezone: "".to_string(),
allow_incompat,
allow_cast_unsigned_ints: false,
is_adapting_schema: false,
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
}
}
}
/// Spark-compatible cast implementation. Defers to DataFusion's cast where that is known
/// to be compatible, and returns an error when a not supported and not DF-compatible cast
/// is requested.
pub fn spark_parquet_convert(
arg: ColumnarValue,
data_type: &DataType,
parquet_options: &SparkParquetOptions,
) -> DataFusionResult<ColumnarValue> {
match arg {
ColumnarValue::Array(array) => Ok(ColumnarValue::Array(cast_array(
array,
data_type,
parquet_options,
)?)),
ColumnarValue::Scalar(scalar) => {
// Note that normally CAST(scalar) should be fold in Spark JVM side. However, for
// some cases e.g., scalar subquery, Spark will not fold it, so we need to handle it
// here.
let array = scalar.to_array()?;
let scalar =
ScalarValue::try_from_array(&cast_array(array, data_type, parquet_options)?, 0)?;
Ok(ColumnarValue::Scalar(scalar))
}
}
}
fn cast_array(
array: ArrayRef,
to_type: &DataType,
parquet_options: &SparkParquetOptions,
) -> DataFusionResult<ArrayRef> {
use DataType::*;
let from_type = array.data_type().clone();
let array = match &from_type {
Dictionary(key_type, value_type)
if key_type.as_ref() == &Int32
&& (value_type.as_ref() == &Utf8 || value_type.as_ref() == &LargeUtf8) =>
{
let dict_array = array
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.expect("Expected a dictionary array");
let casted_dictionary = DictionaryArray::<Int32Type>::new(
dict_array.keys().clone(),
cast_array(Arc::clone(dict_array.values()), to_type, parquet_options)?,
);
let casted_result = match to_type {
Dictionary(_, _) => Arc::new(casted_dictionary.clone()),
_ => take(casted_dictionary.values().as_ref(), dict_array.keys(), None)?,
};
return Ok(casted_result);
}
_ => array,
};
let from_type = array.data_type();
match (from_type, to_type) {
(Struct(_), Struct(_)) => Ok(cast_struct_to_struct(
array.as_struct(),
from_type,
to_type,
parquet_options,
)?),
(Timestamp(TimeUnit::Microsecond, None), Timestamp(TimeUnit::Microsecond, Some(tz))) => {
Ok(Arc::new(
array
.as_primitive::<TimestampMicrosecondType>()
.reinterpret_cast::<TimestampMicrosecondType>()
.with_timezone(Arc::clone(tz)),
))
}
_ => Ok(cast_with_options(&array, to_type, &PARQUET_OPTIONS)?),
}
}
/// Cast between struct types based on logic in
/// `org.apache.spark.sql.catalyst.expressions.Cast#castStruct`.
fn cast_struct_to_struct(
array: &StructArray,
from_type: &DataType,
to_type: &DataType,
parquet_options: &SparkParquetOptions,
) -> DataFusionResult<ArrayRef> {
match (from_type, to_type) {
(DataType::Struct(from_fields), DataType::Struct(to_fields)) => {
// TODO some of this logic may be specific to converting Parquet to Spark
let mut field_name_to_index_map = HashMap::new();
for (i, field) in from_fields.iter().enumerate() {
field_name_to_index_map.insert(field.name(), i);
}
assert_eq!(field_name_to_index_map.len(), from_fields.len());
let mut cast_fields: Vec<ArrayRef> = Vec::with_capacity(to_fields.len());
for i in 0..to_fields.len() {
// Fields in the to_type schema may not exist in the from_type schema
// i.e. the required schema may have fields that the file does not
// have
if field_name_to_index_map.contains_key(to_fields[i].name()) {
let from_index = field_name_to_index_map[to_fields[i].name()];
let cast_field = cast_array(
Arc::clone(array.column(from_index)),
to_fields[i].data_type(),
parquet_options,
)?;
cast_fields.push(cast_field);
} else {
cast_fields.push(new_null_array(to_fields[i].data_type(), array.len()));
}
}
Ok(Arc::new(StructArray::new(
to_fields.clone(),
cast_fields,
array.nulls().cloned(),
)))
}
_ => unreachable!(),
}
}
// Mirrors object_store::parse::parse_url for the hdfs object store
#[cfg(feature = "hdfs")]
fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
match datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
{
Some(object_store) => {
let path = object_store.get_path(url.as_str());
Ok((Box::new(object_store), path))
}
_ => Err(object_store::Error::Generic {
store: "HadoopFileSystem",
source: "Could not create hdfs object store".into(),
}),
}
}
#[cfg(not(feature = "hdfs"))]
fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
Err(object_store::Error::Generic {
store: "HadoopFileSystem",
source: "Hdfs support is not enabled in this build".into(),
})
}
/// Parses the url, registers the object store, and returns a tuple of the object store url and object store path
pub(crate) fn prepare_object_store(
runtime_env: Arc<RuntimeEnv>,
url: String,
) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
let mut url = Url::parse(url.as_str())
.map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL {url}: {e}")))?;
let mut scheme = url.scheme();
if scheme == "s3a" {
scheme = "s3";
url.set_scheme("s3").map_err(|_| {
ExecutionError::GeneralError("Could not convert scheme from s3a to s3".to_string())
})?;
}
let url_key = format!(
"{}://{}",
scheme,
&url[url::Position::BeforeHost..url::Position::AfterPort],
);
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" {
parse_hdfs_url(&url)
} else {
parse_url(&url)
}
.map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
let object_store_url = ObjectStoreUrl::parse(url_key.clone())?;
runtime_env.register_object_store(&url, Arc::from(object_store));
Ok((object_store_url, object_store_path))
}
#[cfg(test)]
mod tests {
use crate::parquet::parquet_support::prepare_object_store;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use object_store::path::Path;
use std::sync::Arc;
use url::Url;
#[cfg(not(feature = "hdfs"))]
#[test]
fn test_prepare_object_store() {
use crate::execution::operators::ExecutionError;
let local_file_system_url = "file:///comet/spark-warehouse/part-00000.snappy.parquet";
let s3_url = "s3a://test_bucket/comet/spark-warehouse/part-00000.snappy.parquet";
let hdfs_url = "hdfs://localhost:8020/comet/spark-warehouse/part-00000.snappy.parquet";
let all_urls = [local_file_system_url, s3_url, hdfs_url];
let expected: Vec<Result<(ObjectStoreUrl, Path), ExecutionError>> = vec![
Ok((
ObjectStoreUrl::parse("file://").unwrap(),
Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
)),
Ok((
ObjectStoreUrl::parse("s3://test_bucket").unwrap(),
Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
)),
Err(ExecutionError::GeneralError(
"Generic HadoopFileSystem error: Hdfs support is not enabled in this build"
.parse()
.unwrap(),
)),
];
for (i, url_str) in all_urls.iter().enumerate() {
let url = &Url::parse(url_str).unwrap();
let res = prepare_object_store(Arc::new(RuntimeEnv::default()), url.to_string());
let expected = expected.get(i).unwrap();
match expected {
Ok((o, p)) => {
let (r_o, r_p) = res.unwrap();
assert_eq!(r_o, *o);
assert_eq!(r_p, *p);
}
Err(e) => {
assert!(res.is_err());
let Err(res_e) = res else {
panic!("test failed")
};
assert_eq!(e.to_string(), res_e.to_string())
}
}
}
}
#[test]
#[cfg(feature = "hdfs")]
fn test_prepare_object_store() {
// we use a local file system url instead of an hdfs url because the latter requires
// a running namenode
let hdfs_url = "file:///comet/spark-warehouse/part-00000.snappy.parquet";
let expected: (ObjectStoreUrl, Path) = (
ObjectStoreUrl::parse("file://").unwrap(),
Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
);
let url = &Url::parse(hdfs_url).unwrap();
let res = prepare_object_store(Arc::new(RuntimeEnv::default()), url.to_string());
let res = res.unwrap();
assert_eq!(res.0, expected.0);
assert_eq!(res.1, expected.1);
}
}