in src/parquet/arrow/reader.cc [760:876]
Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<Array>* array) {
const int16_t* def_levels = record_reader_->def_levels();
const int16_t* rep_levels = record_reader_->rep_levels();
const int64_t total_levels_read = record_reader_->levels_position();
std::shared_ptr<::arrow::Schema> arrow_schema;
RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
input_->metadata()->key_value_metadata(),
&arrow_schema));
std::shared_ptr<Field> current_field = arrow_schema->field(0);
if (descr_->max_repetition_level() > 0) {
// Walk downwards to extract nullability
std::vector<bool> nullable;
std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
nullable.push_back(current_field->nullable());
while (current_field->type()->num_children() > 0) {
if (current_field->type()->num_children() > 1) {
return Status::NotImplemented(
"Fields with more than one child are not supported.");
} else {
if (current_field->type()->id() != ::arrow::Type::LIST) {
return Status::NotImplemented(
"Currently only nesting with Lists is supported.");
}
current_field = current_field->type()->child(0);
}
offset_builders.emplace_back(
std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_));
valid_bits_builders.emplace_back(
std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_));
nullable.push_back(current_field->nullable());
}
int64_t list_depth = offset_builders.size();
// This describes the minimal definition that describes a level that
// reflects a value in the primitive values array.
int16_t values_def_level = descr_->max_definition_level();
if (nullable[nullable.size() - 1]) {
values_def_level--;
}
// The definition levels that are needed so that a list is declared
// as empty and not null.
std::vector<int16_t> empty_def_level(list_depth);
int def_level = 0;
for (int i = 0; i < list_depth; i++) {
if (nullable[i]) {
def_level++;
}
empty_def_level[i] = static_cast<int16_t>(def_level);
def_level++;
}
int32_t values_offset = 0;
std::vector<int64_t> null_counts(list_depth, 0);
for (int64_t i = 0; i < total_levels_read; i++) {
int16_t rep_level = rep_levels[i];
if (rep_level < descr_->max_repetition_level()) {
for (int64_t j = rep_level; j < list_depth; j++) {
if (j == (list_depth - 1)) {
RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
} else {
RETURN_NOT_OK(offset_builders[j]->Append(
static_cast<int32_t>(offset_builders[j + 1]->length())));
}
if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
null_counts[j]++;
break;
} else {
RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
if (empty_def_level[j] == def_levels[i]) {
break;
}
}
}
}
if (def_levels[i] >= values_def_level) {
values_offset++;
}
}
// Add the final offset to all lists
for (int64_t j = 0; j < list_depth; j++) {
if (j == (list_depth - 1)) {
RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
} else {
RETURN_NOT_OK(offset_builders[j]->Append(
static_cast<int32_t>(offset_builders[j + 1]->length())));
}
}
std::vector<std::shared_ptr<Buffer>> offsets;
std::vector<std::shared_ptr<Buffer>> valid_bits;
std::vector<int64_t> list_lengths;
for (int64_t j = 0; j < list_depth; j++) {
list_lengths.push_back(offset_builders[j]->length() - 1);
std::shared_ptr<Array> array;
RETURN_NOT_OK(offset_builders[j]->Finish(&array));
offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
}
std::shared_ptr<Array> output(*array);
for (int64_t j = list_depth - 1; j >= 0; j--) {
auto list_type =
::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
output = std::make_shared<::arrow::ListArray>(
list_type, list_lengths[j], offsets[j], output, valid_bits[j], null_counts[j]);
}
*array = output;
}
return Status::OK();
}