in datafusion/physical-expr/src/datetime_expressions.rs [524:691]
fn date_bin_impl(
stride: &ColumnarValue,
array: &ColumnarValue,
origin: &ColumnarValue,
) -> Result<ColumnarValue> {
let stride = match stride {
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
let (days, ms) = IntervalDayTimeType::to_parts(*v);
let nanos = (Duration::days(days as i64) + Duration::milliseconds(ms as i64))
.num_nanoseconds();
match nanos {
Some(v) => Interval::Nanoseconds(v),
_ => {
return Err(DataFusionError::Execution(
"DATE_BIN stride argument is too large".to_string(),
))
}
}
}
ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
// If interval is months, its origin must be midnight of first date of the month
if months != 0 {
// Return error if days or nanos is not zero
if days != 0 || nanos != 0 {
return Err(DataFusionError::NotImplemented(
"DATE_BIN stride does not support combination of month, day and nanosecond intervals".to_string(),
));
} else {
Interval::Months(months as i64)
}
} else {
let nanos = (Duration::days(days as i64) + Duration::nanoseconds(nanos))
.num_nanoseconds();
match nanos {
Some(v) => Interval::Nanoseconds(v),
_ => {
return Err(DataFusionError::Execution(
"DATE_BIN stride argument is too large".to_string(),
))
}
}
}
}
ColumnarValue::Scalar(v) => {
return Err(DataFusionError::Execution(format!(
"DATE_BIN expects stride argument to be an INTERVAL but got {}",
v.get_datatype()
)))
}
ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented(
"DATE_BIN only supports literal values for the stride argument, not arrays"
.to_string(),
)),
};
let origin = match origin {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
ColumnarValue::Scalar(v) => {
return Err(DataFusionError::Execution(format!(
"DATE_BIN expects origin argument to be a TIMESTAMP with nanosececond precision but got {}",
v.get_datatype()
)))
}
ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented(
"DATE_BIN only supports literal values for the origin argument, not arrays"
.to_string(),
)),
};
let (stride, stride_fn) = stride.bin_fn();
// Return error if stride is 0
if stride == 0 {
return Err(DataFusionError::Execution(
"DATE_BIN stride must be non-zero".to_string(),
));
}
let f_nanos = |x: Option<i64>| x.map(|x| stride_fn(stride, x, origin));
let f_micros = |x: Option<i64>| {
let scale = 1_000;
x.map(|x| stride_fn(stride, x * scale, origin) / scale)
};
let f_millis = |x: Option<i64>| {
let scale = 1_000_000;
x.map(|x| stride_fn(stride, x * scale, origin) / scale)
};
let f_secs = |x: Option<i64>| {
let scale = 1_000_000_000;
x.map(|x| stride_fn(stride, x * scale, origin) / scale)
};
Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
f_nanos(*v),
tz_opt.clone(),
))
}
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
f_micros(*v),
tz_opt.clone(),
))
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
f_millis(*v),
tz_opt.clone(),
))
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
ColumnarValue::Scalar(ScalarValue::TimestampSecond(
f_secs(*v),
tz_opt.clone(),
))
}
ColumnarValue::Array(array) => match array.data_type() {
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
let array = as_timestamp_nanosecond_array(array)?
.iter()
.map(f_nanos)
.collect::<TimestampNanosecondArray>();
ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
let array = as_timestamp_microsecond_array(array)?
.iter()
.map(f_micros)
.collect::<TimestampMicrosecondArray>();
ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
let array = as_timestamp_millisecond_array(array)?
.iter()
.map(f_millis)
.collect::<TimestampMillisecondArray>();
ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Second, _) => {
let array = as_timestamp_second_array(array)?
.iter()
.map(f_secs)
.collect::<TimestampSecondArray>();
ColumnarValue::Array(Arc::new(array))
}
_ => {
return Err(DataFusionError::Execution(format!(
"DATE_BIN expects source argument to be a TIMESTAMP but got {}",
array.data_type()
)))
}
},
_ => {
return Err(DataFusionError::Execution(
"DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
.to_string(),
));
}
})
}