in parquet/src/record/reader.rs [106:326]
fn reader_tree(
&self,
field: TypePtr,
path: &mut Vec<String>,
mut curr_def_level: i16,
mut curr_rep_level: i16,
paths: &HashMap<ColumnPath, usize>,
row_group_reader: &dyn RowGroupReader,
) -> Result<Reader> {
assert!(field.get_basic_info().has_repetition());
// Update current definition and repetition levels for this type
let repetition = field.get_basic_info().repetition();
match repetition {
Repetition::OPTIONAL => {
curr_def_level += 1;
}
Repetition::REPEATED => {
curr_def_level += 1;
curr_rep_level += 1;
}
_ => {}
}
path.push(String::from(field.name()));
let reader = if field.is_primitive() {
let col_path = ColumnPath::new(path.to_vec());
let orig_index = *paths
.get(&col_path)
.ok_or(general_err!("Path {:?} not found", col_path))?;
let col_descr = row_group_reader
.metadata()
.column(orig_index)
.column_descr_ptr();
let col_reader = row_group_reader.get_column_reader(orig_index)?;
let column = TripletIter::new(col_descr, col_reader, self.batch_size);
let reader = Reader::PrimitiveReader(field.clone(), Box::new(column));
if repetition == Repetition::REPEATED {
Reader::RepeatedReader(
field,
curr_def_level - 1,
curr_rep_level - 1,
Box::new(reader),
)
} else {
reader
}
} else {
match field.get_basic_info().converted_type() {
// List types
ConvertedType::LIST => {
assert_eq!(field.get_fields().len(), 1, "Invalid list type {field:?}");
let repeated_field = field.get_fields()[0].clone();
assert_eq!(
repeated_field.get_basic_info().repetition(),
Repetition::REPEATED,
"Invalid list type {field:?}"
);
if Reader::is_element_type(&repeated_field) {
// Support for backward compatible lists
let reader = self.reader_tree(
repeated_field,
path,
curr_def_level,
curr_rep_level,
paths,
row_group_reader,
)?;
Reader::RepeatedReader(
field,
curr_def_level,
curr_rep_level,
Box::new(reader),
)
} else {
let child_field = repeated_field.get_fields()[0].clone();
path.push(String::from(repeated_field.name()));
let reader = self.reader_tree(
child_field,
path,
curr_def_level + 1,
curr_rep_level + 1,
paths,
row_group_reader,
)?;
path.pop();
Reader::RepeatedReader(
field,
curr_def_level,
curr_rep_level,
Box::new(reader),
)
}
}
// Map types (key-value pairs)
ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
assert_eq!(field.get_fields().len(), 1, "Invalid map type: {field:?}");
assert!(
!field.get_fields()[0].is_primitive(),
"Invalid map type: {field:?}"
);
let key_value_type = field.get_fields()[0].clone();
assert_eq!(
key_value_type.get_basic_info().repetition(),
Repetition::REPEATED,
"Invalid map type: {field:?}"
);
// Parquet spec allows no value. In that case treat as a list. #1642
if key_value_type.get_fields().len() != 1 {
// If not a list, then there can only be 2 fields in the struct
assert_eq!(
key_value_type.get_fields().len(),
2,
"Invalid map type: {field:?}"
);
}
path.push(String::from(key_value_type.name()));
let key_type = &key_value_type.get_fields()[0];
assert!(
key_type.is_primitive(),
"Map key type is expected to be a primitive type, but found {key_type:?}"
);
let key_reader = self.reader_tree(
key_type.clone(),
path,
curr_def_level + 1,
curr_rep_level + 1,
paths,
row_group_reader,
)?;
if key_value_type.get_fields().len() == 1 {
path.pop();
Reader::RepeatedReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
)
} else {
let value_type = &key_value_type.get_fields()[1];
let value_reader = self.reader_tree(
value_type.clone(),
path,
curr_def_level + 1,
curr_rep_level + 1,
paths,
row_group_reader,
)?;
path.pop();
Reader::KeyValueReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
Box::new(value_reader),
)
}
}
// A repeated field that is neither contained by a `LIST`- or
// `MAP`-annotated group nor annotated by `LIST` or `MAP`
// should be interpreted as a required list of required
// elements where the element type is the type of the field.
_ if repetition == Repetition::REPEATED => {
let required_field = Type::group_type_builder(field.name())
.with_repetition(Repetition::REQUIRED)
.with_converted_type(field.get_basic_info().converted_type())
.with_fields(field.get_fields().to_vec())
.build()?;
path.pop();
let reader = self.reader_tree(
Arc::new(required_field),
path,
curr_def_level,
curr_rep_level,
paths,
row_group_reader,
)?;
return Ok(Reader::RepeatedReader(
field,
curr_def_level - 1,
curr_rep_level - 1,
Box::new(reader),
));
}
// Group types (structs)
_ => {
let mut readers = Vec::new();
for child in field.get_fields() {
let reader = self.reader_tree(
child.clone(),
path,
curr_def_level,
curr_rep_level,
paths,
row_group_reader,
)?;
readers.push(reader);
}
Reader::GroupReader(Some(field), curr_def_level, readers)
}
}
};
path.pop();
Ok(Reader::option(repetition, curr_def_level, reader))
}