Status PrimitiveImpl::WrapIntoListArray()

in src/parquet/arrow/reader.cc [760:876]


Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<Array>* array) {
  const int16_t* def_levels = record_reader_->def_levels();
  const int16_t* rep_levels = record_reader_->rep_levels();
  const int64_t total_levels_read = record_reader_->levels_position();

  std::shared_ptr<::arrow::Schema> arrow_schema;
  RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
                                  input_->metadata()->key_value_metadata(),
                                  &arrow_schema));
  std::shared_ptr<Field> current_field = arrow_schema->field(0);

  if (descr_->max_repetition_level() > 0) {
    // Walk downwards to extract nullability
    std::vector<bool> nullable;
    std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
    std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
    nullable.push_back(current_field->nullable());
    while (current_field->type()->num_children() > 0) {
      if (current_field->type()->num_children() > 1) {
        return Status::NotImplemented(
            "Fields with more than one child are not supported.");
      } else {
        if (current_field->type()->id() != ::arrow::Type::LIST) {
          return Status::NotImplemented(
              "Currently only nesting with Lists is supported.");
        }
        current_field = current_field->type()->child(0);
      }
      offset_builders.emplace_back(
          std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_));
      valid_bits_builders.emplace_back(
          std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_));
      nullable.push_back(current_field->nullable());
    }

    int64_t list_depth = offset_builders.size();
    // This describes the minimal definition that describes a level that
    // reflects a value in the primitive values array.
    int16_t values_def_level = descr_->max_definition_level();
    if (nullable[nullable.size() - 1]) {
      values_def_level--;
    }

    // The definition levels that are needed so that a list is declared
    // as empty and not null.
    std::vector<int16_t> empty_def_level(list_depth);
    int def_level = 0;
    for (int i = 0; i < list_depth; i++) {
      if (nullable[i]) {
        def_level++;
      }
      empty_def_level[i] = static_cast<int16_t>(def_level);
      def_level++;
    }

    int32_t values_offset = 0;
    std::vector<int64_t> null_counts(list_depth, 0);
    for (int64_t i = 0; i < total_levels_read; i++) {
      int16_t rep_level = rep_levels[i];
      if (rep_level < descr_->max_repetition_level()) {
        for (int64_t j = rep_level; j < list_depth; j++) {
          if (j == (list_depth - 1)) {
            RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
          } else {
            RETURN_NOT_OK(offset_builders[j]->Append(
                static_cast<int32_t>(offset_builders[j + 1]->length())));
          }

          if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
            RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
            null_counts[j]++;
            break;
          } else {
            RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
            if (empty_def_level[j] == def_levels[i]) {
              break;
            }
          }
        }
      }
      if (def_levels[i] >= values_def_level) {
        values_offset++;
      }
    }
    // Add the final offset to all lists
    for (int64_t j = 0; j < list_depth; j++) {
      if (j == (list_depth - 1)) {
        RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
      } else {
        RETURN_NOT_OK(offset_builders[j]->Append(
            static_cast<int32_t>(offset_builders[j + 1]->length())));
      }
    }

    std::vector<std::shared_ptr<Buffer>> offsets;
    std::vector<std::shared_ptr<Buffer>> valid_bits;
    std::vector<int64_t> list_lengths;
    for (int64_t j = 0; j < list_depth; j++) {
      list_lengths.push_back(offset_builders[j]->length() - 1);
      std::shared_ptr<Array> array;
      RETURN_NOT_OK(offset_builders[j]->Finish(&array));
      offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
      RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
      valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
    }

    std::shared_ptr<Array> output(*array);
    for (int64_t j = list_depth - 1; j >= 0; j--) {
      auto list_type =
          ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
      output = std::make_shared<::arrow::ListArray>(
          list_type, list_lengths[j], offsets[j], output, valid_bits[j], null_counts[j]);
    }
    *array = output;
  }
  return Status::OK();
}