fn date_bin_impl()

in datafusion/physical-expr/src/datetime_expressions.rs [524:691]


fn date_bin_impl(
    stride: &ColumnarValue,
    array: &ColumnarValue,
    origin: &ColumnarValue,
) -> Result<ColumnarValue> {
    let stride = match stride {
        ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
            let (days, ms) = IntervalDayTimeType::to_parts(*v);
            let nanos = (Duration::days(days as i64) + Duration::milliseconds(ms as i64))
                .num_nanoseconds();

            match nanos {
                Some(v) => Interval::Nanoseconds(v),
                _ => {
                    return Err(DataFusionError::Execution(
                        "DATE_BIN stride argument is too large".to_string(),
                    ))
                }
            }
        }
        ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);

            // If interval is months, its origin must be midnight of first date of the month
            if months != 0 {
                // Return error if days or nanos is not zero
                if days != 0 || nanos != 0 {
                    return Err(DataFusionError::NotImplemented(
                        "DATE_BIN stride does not support combination of month, day and nanosecond intervals".to_string(),
                    ));
                } else {
                    Interval::Months(months as i64)
                }
            } else {
                let nanos = (Duration::days(days as i64) + Duration::nanoseconds(nanos))
                    .num_nanoseconds();
                match nanos {
                    Some(v) => Interval::Nanoseconds(v),
                    _ => {
                        return Err(DataFusionError::Execution(
                            "DATE_BIN stride argument is too large".to_string(),
                        ))
                    }
                }
            }
        }
        ColumnarValue::Scalar(v) => {
            return Err(DataFusionError::Execution(format!(
                "DATE_BIN expects stride argument to be an INTERVAL but got {}",
                v.get_datatype()
            )))
        }
        ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented(
            "DATE_BIN only supports literal values for the stride argument, not arrays"
                .to_string(),
        )),
    };

    let origin = match origin {
        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
        ColumnarValue::Scalar(v) => {
            return Err(DataFusionError::Execution(format!(
                "DATE_BIN expects origin argument to be a TIMESTAMP with nanosececond precision but got {}",
                v.get_datatype()
            )))
        }
        ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented(
            "DATE_BIN only supports literal values for the origin argument, not arrays"
                .to_string(),
        )),
    };

    let (stride, stride_fn) = stride.bin_fn();

    // Return error if stride is 0
    if stride == 0 {
        return Err(DataFusionError::Execution(
            "DATE_BIN stride must be non-zero".to_string(),
        ));
    }

    let f_nanos = |x: Option<i64>| x.map(|x| stride_fn(stride, x, origin));
    let f_micros = |x: Option<i64>| {
        let scale = 1_000;
        x.map(|x| stride_fn(stride, x * scale, origin) / scale)
    };
    let f_millis = |x: Option<i64>| {
        let scale = 1_000_000;
        x.map(|x| stride_fn(stride, x * scale, origin) / scale)
    };
    let f_secs = |x: Option<i64>| {
        let scale = 1_000_000_000;
        x.map(|x| stride_fn(stride, x * scale, origin) / scale)
    };

    Ok(match array {
        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
                f_nanos(*v),
                tz_opt.clone(),
            ))
        }
        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
                f_micros(*v),
                tz_opt.clone(),
            ))
        }
        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
                f_millis(*v),
                tz_opt.clone(),
            ))
        }
        ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
            ColumnarValue::Scalar(ScalarValue::TimestampSecond(
                f_secs(*v),
                tz_opt.clone(),
            ))
        }
        ColumnarValue::Array(array) => match array.data_type() {
            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
                let array = as_timestamp_nanosecond_array(array)?
                    .iter()
                    .map(f_nanos)
                    .collect::<TimestampNanosecondArray>();

                ColumnarValue::Array(Arc::new(array))
            }
            DataType::Timestamp(TimeUnit::Microsecond, _) => {
                let array = as_timestamp_microsecond_array(array)?
                    .iter()
                    .map(f_micros)
                    .collect::<TimestampMicrosecondArray>();

                ColumnarValue::Array(Arc::new(array))
            }
            DataType::Timestamp(TimeUnit::Millisecond, _) => {
                let array = as_timestamp_millisecond_array(array)?
                    .iter()
                    .map(f_millis)
                    .collect::<TimestampMillisecondArray>();

                ColumnarValue::Array(Arc::new(array))
            }
            DataType::Timestamp(TimeUnit::Second, _) => {
                let array = as_timestamp_second_array(array)?
                    .iter()
                    .map(f_secs)
                    .collect::<TimestampSecondArray>();

                ColumnarValue::Array(Arc::new(array))
            }
            _ => {
                return Err(DataFusionError::Execution(format!(
                    "DATE_BIN expects source argument to be a TIMESTAMP but got {}",
                    array.data_type()
                )))
            }
        },
        _ => {
            return Err(DataFusionError::Execution(
                "DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
                    .to_string(),
            ));
        }
    })
}