in crates/iceberg/src/transform/bucket.rs [138:231]
fn transform(&self, input: ArrayRef) -> crate::Result<ArrayRef> {
let res: arrow_array::Int32Array = match input.data_type() {
DataType::Int32 => input
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap()
.unary(|v| self.bucket_int(v)),
DataType::Int64 => input
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.unwrap()
.unary(|v| self.bucket_long(v)),
DataType::Decimal128(_, _) => input
.as_any()
.downcast_ref::<arrow_array::Decimal128Array>()
.unwrap()
.unary(|v| self.bucket_decimal(v)),
DataType::Date32 => input
.as_any()
.downcast_ref::<arrow_array::Date32Array>()
.unwrap()
.unary(|v| self.bucket_date(v)),
DataType::Time64(TimeUnit::Microsecond) => input
.as_any()
.downcast_ref::<arrow_array::Time64MicrosecondArray>()
.unwrap()
.unary(|v| self.bucket_time(v)),
DataType::Timestamp(TimeUnit::Microsecond, _) => input
.as_any()
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap()
.unary(|v| self.bucket_timestamp(v)),
DataType::Time64(TimeUnit::Nanosecond) => input
.as_any()
.downcast_ref::<arrow_array::Time64NanosecondArray>()
.unwrap()
.unary(|v| self.bucket_time(v / 1000)),
DataType::Timestamp(TimeUnit::Nanosecond, _) => input
.as_any()
.downcast_ref::<arrow_array::TimestampNanosecondArray>()
.unwrap()
.unary(|v| self.bucket_timestamp(v / 1000)),
DataType::Utf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap()
.iter()
.map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::LargeUtf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::LargeStringArray>()
.unwrap()
.iter()
.map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::Binary => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::BinaryArray>()
.unwrap()
.iter()
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::LargeBinary => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::LargeBinaryArray>()
.unwrap()
.iter()
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::FixedSizeBinaryArray>()
.unwrap()
.iter()
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
_ => {
return Err(crate::Error::new(
crate::ErrorKind::FeatureUnsupported,
format!(
"Unsupported data type for bucket transform: {:?}",
input.data_type()
),
))
}
};
Ok(Arc::new(res))
}