native/spark-expr/src/utils.rs (170 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use arrow::array::{ cast::as_primitive_array, types::{Int32Type, TimestampMicrosecondType}, }; use arrow::datatypes::{DataType, TimeUnit, DECIMAL128_MAX_PRECISION}; use std::sync::Arc; use crate::timezone::Tz; use arrow::array::types::TimestampMillisecondType; use arrow::datatypes::{MAX_DECIMAL128_FOR_EACH_PRECISION, MIN_DECIMAL128_FOR_EACH_PRECISION}; use arrow::error::ArrowError; use arrow::{ array::{as_dictionary_array, Array, ArrayRef, PrimitiveArray}, temporal_conversions::as_datetime, }; use chrono::{DateTime, Offset, TimeZone}; /// Preprocesses input arrays to add timezone information from Spark to Arrow array datatype or /// to apply timezone offset. // // We consider the following cases: // // | --------------------- | ------------ | ----------------- | -------------------------------- | // | Conversion | Input array | Timezone | Output array | // | --------------------- | ------------ | ----------------- | -------------------------------- | // | Timestamp -> | Array in UTC | Timezone of input | A timestamp with the timezone | // | Utf8 or Date32 | | | offset applied and timezone | // | | | | removed | // | --------------------- | ------------ | ----------------- | -------------------------------- | // | Timestamp -> | Array in UTC | Timezone of input | Same as input array | // | Timestamp w/Timezone| | | | // | --------------------- | ------------ | ----------------- | -------------------------------- | // | Timestamp_ntz -> | Array in | Timezone of input | Same as input array | // | Utf8 or Date32 | timezone | | | // | | session local| | | // | | timezone | | | // | --------------------- | ------------ | ----------------- | -------------------------------- | // | Timestamp_ntz -> | Array in | Timezone of input | Array in UTC and timezone | // | Timestamp w/Timezone | session local| | specified in input | // | | timezone | | | // | --------------------- | ------------ | ----------------- | -------------------------------- | // | Timestamp(_ntz) -> | | // | Any other type | Not Supported | // | --------------------- | ------------ | ----------------- | -------------------------------- | // pub fn array_with_timezone( array: ArrayRef, timezone: String, to_type: Option<&DataType>, ) -> Result<ArrayRef, ArrowError> { match array.data_type() { DataType::Timestamp(_, None) => { assert!(!timezone.is_empty()); match to_type { Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array), Some(DataType::Timestamp(_, Some(_))) => { timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str())) } _ => { // Not supported panic!( "Cannot convert from {:?} to {:?}", array.data_type(), to_type.unwrap() ) } } } DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => { assert!(!timezone.is_empty()); let array = as_primitive_array::<TimestampMicrosecondType>(&array); let array_with_timezone = array.clone().with_timezone(timezone.clone()); let array = Arc::new(array_with_timezone) as ArrayRef; match to_type { Some(DataType::Utf8) | Some(DataType::Date32) => { pre_timestamp_cast(array, timezone) } _ => Ok(array), } } DataType::Timestamp(TimeUnit::Millisecond, Some(_)) => { assert!(!timezone.is_empty()); let array = as_primitive_array::<TimestampMillisecondType>(&array); let array_with_timezone = array.clone().with_timezone(timezone.clone()); let array = Arc::new(array_with_timezone) as ArrayRef; match to_type { Some(DataType::Utf8) | Some(DataType::Date32) => { pre_timestamp_cast(array, timezone) } _ => Ok(array), } } DataType::Dictionary(_, value_type) if matches!(value_type.as_ref(), &DataType::Timestamp(_, _)) => { let dict = as_dictionary_array::<Int32Type>(&array); let array = as_primitive_array::<TimestampMicrosecondType>(dict.values()); let array_with_timezone = array_with_timezone(Arc::new(array.clone()) as ArrayRef, timezone, to_type)?; let dict = dict.with_values(array_with_timezone); Ok(Arc::new(dict)) } _ => Ok(array), } } fn datetime_cast_err(value: i64) -> ArrowError { ArrowError::CastError(format!( "Cannot convert TimestampMicrosecondType {value} to datetime. Comet only supports dates between Jan 1, 262145 BCE and Dec 31, 262143 CE", )) } /// Takes in a Timestamp(Microsecond, None) array and a timezone id, and returns /// a Timestamp(Microsecond, Some<_>) array. /// The understanding is that the input array has time in the timezone specified in the second /// argument. /// Parameters: /// array - input array of timestamp without timezone /// tz - timezone of the values in the input array /// to_timezone - timezone to change the input values to fn timestamp_ntz_to_timestamp( array: ArrayRef, tz: &str, to_timezone: Option<&str>, ) -> Result<ArrayRef, ArrowError> { assert!(!tz.is_empty()); match array.data_type() { DataType::Timestamp(TimeUnit::Microsecond, None) => { let array = as_primitive_array::<TimestampMicrosecondType>(&array); let tz: Tz = tz.parse()?; let array: PrimitiveArray<TimestampMicrosecondType> = array.try_unary(|value| { as_datetime::<TimestampMicrosecondType>(value) .ok_or_else(|| datetime_cast_err(value)) .map(|local_datetime| { let datetime: DateTime<Tz> = tz.from_local_datetime(&local_datetime).unwrap(); datetime.timestamp_micros() }) })?; let array_with_tz = if let Some(to_tz) = to_timezone { array.with_timezone(to_tz) } else { array }; Ok(Arc::new(array_with_tz)) } DataType::Timestamp(TimeUnit::Millisecond, None) => { let array = as_primitive_array::<TimestampMillisecondType>(&array); let tz: Tz = tz.parse()?; let array: PrimitiveArray<TimestampMillisecondType> = array.try_unary(|value| { as_datetime::<TimestampMillisecondType>(value) .ok_or_else(|| datetime_cast_err(value)) .map(|local_datetime| { let datetime: DateTime<Tz> = tz.from_local_datetime(&local_datetime).unwrap(); datetime.timestamp_millis() }) })?; let array_with_tz = if let Some(to_tz) = to_timezone { array.with_timezone(to_tz) } else { array }; Ok(Arc::new(array_with_tz)) } _ => Ok(array), } } /// This takes for special pre-casting cases of Spark. E.g., Timestamp to String. fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result<ArrayRef, ArrowError> { assert!(!timezone.is_empty()); match array.data_type() { DataType::Timestamp(_, _) => { // Spark doesn't output timezone while casting timestamp to string, but arrow's cast // kernel does if timezone exists. So we need to apply offset of timezone to array // timestamp value and remove timezone from array datatype. let array = as_primitive_array::<TimestampMicrosecondType>(&array); let tz: Tz = timezone.parse()?; let array: PrimitiveArray<TimestampMicrosecondType> = array.try_unary(|value| { as_datetime::<TimestampMicrosecondType>(value) .ok_or_else(|| datetime_cast_err(value)) .map(|datetime| { let offset = tz.offset_from_utc_datetime(&datetime).fix(); let datetime = datetime + offset; datetime.and_utc().timestamp_micros() }) })?; Ok(Arc::new(array)) } _ => Ok(array), } } /// Adapted from arrow-rs `validate_decimal_precision` but returns bool /// instead of Err to avoid the cost of formatting the error strings and is /// optimized to remove a memcpy that exists in the original function /// we can remove this code once we upgrade to a version of arrow-rs that /// includes https://github.com/apache/arrow-rs/pull/6419 #[inline] pub fn is_valid_decimal_precision(value: i128, precision: u8) -> bool { precision <= DECIMAL128_MAX_PRECISION && value >= MIN_DECIMAL128_FOR_EACH_PRECISION[precision as usize] && value <= MAX_DECIMAL128_FOR_EACH_PRECISION[precision as usize] } // These are borrowed from hashbrown crate: // https://github.com/rust-lang/hashbrown/blob/master/src/raw/mod.rs // On stable we can use #[cold] to get a equivalent effect: this attributes // suggests that the function is unlikely to be called #[inline] #[cold] pub fn cold() {} #[inline] pub fn likely(b: bool) -> bool { if !b { cold(); } b } #[inline] pub fn unlikely(b: bool) -> bool { if b { cold(); } b }