in datafusion/datasource/src/write/demux.rs [342:504]
fn compute_partition_keys_by_row<'a>(
rb: &'a RecordBatch,
partition_by: &'a [(String, DataType)],
) -> Result<Vec<Vec<Cow<'a, str>>>> {
let mut all_partition_values = vec![];
const EPOCH_DAYS_FROM_CE: i32 = 719_163;
// For the purposes of writing partitioned data, we can rely on schema inference
// to determine the type of the partition cols in order to provide a more ergonomic
// UI which does not require specifying DataTypes manually. So, we ignore the
// DataType within the partition_by array and infer the correct type from the
// batch schema instead.
let schema = rb.schema();
for (col, _) in partition_by.iter() {
let mut partition_values = vec![];
let dtype = schema.field_with_name(col)?.data_type();
let col_array = rb.column_by_name(col).ok_or(exec_datafusion_err!(
"PartitionBy Column {} does not exist in source data! Got schema {schema}.",
col
))?;
match dtype {
DataType::Utf8 => {
let array = as_string_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i)));
}
}
DataType::Utf8View => {
let array = as_string_view_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i)));
}
}
DataType::Boolean => {
let array = as_boolean_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Date32 => {
let array = as_date32_array(col_array)?;
// ISO-8601/RFC3339 format - yyyy-mm-dd
let format = "%Y-%m-%d";
for i in 0..rb.num_rows() {
let date = NaiveDate::from_num_days_from_ce_opt(
EPOCH_DAYS_FROM_CE + array.value(i),
)
.unwrap()
.format(format)
.to_string();
partition_values.push(Cow::from(date));
}
}
DataType::Date64 => {
let array = as_date64_array(col_array)?;
// ISO-8601/RFC3339 format - yyyy-mm-dd
let format = "%Y-%m-%d";
for i in 0..rb.num_rows() {
let date = NaiveDate::from_num_days_from_ce_opt(
EPOCH_DAYS_FROM_CE + (array.value(i) / 86_400_000) as i32,
)
.unwrap()
.format(format)
.to_string();
partition_values.push(Cow::from(date));
}
}
DataType::Int8 => {
let array = as_int8_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Int16 => {
let array = as_int16_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Int32 => {
let array = as_int32_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Int64 => {
let array = as_int64_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::UInt8 => {
let array = as_uint8_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::UInt16 => {
let array = as_uint16_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::UInt32 => {
let array = as_uint32_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::UInt64 => {
let array = as_uint64_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Float16 => {
let array = as_float16_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Float32 => {
let array = as_float32_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Float64 => {
let array = as_float64_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Dictionary(_, _) => {
downcast_dictionary_array!(
col_array => {
let array = col_array.downcast_dict::<StringArray>()
.ok_or(exec_datafusion_err!("it is not yet supported to write to hive partitions with datatype {}",
dtype))?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i)));
}
},
_ => unreachable!(),
)
}
_ => {
return not_impl_err!(
"it is not yet supported to write to hive partitions with datatype {}",
dtype
)
}
}
all_partition_values.push(partition_values);
}
Ok(all_partition_values)
}