void addVertexRecordBatchImpl()

in flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h [533:703]


  void addVertexRecordBatchImpl(
      label_t v_label_id, const std::vector<std::string>& v_files,
      std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
          label_t, const std::string&, const LoadingConfig&, int)>
          supplier_creator) {
    std::string v_label_name = schema_.get_vertex_label_name(v_label_id);
    VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label "
             << v_label_name;
    auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0];
    auto primary_key_name = std::get<1>(primary_key);
    size_t primary_key_ind = std::get<2>(primary_key);
    grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
    queue.SetLimit(1024);
    PTIndexerBuilder<KEY_T, vid_t> indexer_builder;
    std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> batchs(
        std::thread::hardware_concurrency());
    std::vector<std::thread> work_threads;
    for (auto& v_file : v_files) {
      VLOG(10) << "Parsing vertex file:" << v_file << " for label "
               << v_label_name;
      auto record_batch_supplier_vec =
          supplier_creator(v_label_id, v_file, loading_config_,
                           std::thread::hardware_concurrency());
      queue.SetProducerNum(record_batch_supplier_vec.size());
      for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
        work_threads.emplace_back(
            [&](int i) {
              auto& record_batch_supplier = record_batch_supplier_vec[i];
              bool first_batch = true;
              while (true) {
                auto batch = record_batch_supplier->GetNextBatch();
                if (!batch) {
                  queue.DecProducerNum();
                  break;
                }
                if (first_batch) {
                  auto header = batch->schema()->field_names();
                  auto schema_column_names =
                      schema_.get_vertex_property_names(v_label_id);
                  CHECK(schema_column_names.size() + 1 == header.size())
                      << "File header of size: " << header.size()
                      << " does not match schema column size: "
                      << schema_column_names.size() + 1;
                  first_batch = false;
                }
                queue.Put(batch);
              }
            },
            idx);
      }

      for (unsigned idx = 0; idx < std::thread::hardware_concurrency(); ++idx) {
        work_threads.emplace_back(
            [&](int i) {
              while (true) {
                std::shared_ptr<arrow::RecordBatch> batch{nullptr};
                auto ret = queue.Get(batch);
                if (!ret) {
                  break;
                }
                if (!batch) {
                  LOG(FATAL) << "get nullptr batch";
                }
                batchs[i].emplace_back(batch);
                auto columns = batch->columns();
                CHECK(primary_key_ind < columns.size());
                auto primary_key_column = columns[primary_key_ind];
                {
                  std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
                  _add_vertex<KEY_T>()(primary_key_column, indexer_builder);
                }
              }
            },
            idx);
      }
      for (auto& t : work_threads) {
        t.join();
      }
      work_threads.clear();

      VLOG(10) << "Finish parsing vertex file:" << v_file << " for label "
               << v_label_name;
    }
    basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer_builder);
    const auto& indexer = basic_fragment_loader_.GetLFIndexer(v_label_id);

    auto& vtable = basic_fragment_loader_.GetVertexTable(v_label_id);
    size_t total_row_num = 0;
    for (auto& batch : batchs) {
      for (auto& b : batch) {
        total_row_num += b->num_rows();
      }
    }
    if (total_row_num > vtable.row_num()) {
      std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
      if (total_row_num > vtable.row_num()) {
        LOG(INFO) << "Resize vertex table from " << vtable.row_num() << " to "
                  << total_row_num;
        vtable.resize(total_row_num);
      }
    }

    std::atomic<size_t> cur_batch_id(0);
    for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
      work_threads.emplace_back(
          [&](int idx) {
            for (size_t id = 0; id < batchs[idx].size(); ++id) {
              auto batch = batchs[idx][id];
              auto columns = batch->columns();
              auto other_columns_array = columns;
              auto primary_key_column = columns[primary_key_ind];
              size_t row_num = primary_key_column->length();
              std::vector<size_t> vids;
              if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
                using arrow_array_t =
                    typename gs::TypeConverter<KEY_T>::ArrowArrayType;
                auto casted_array =
                    std::static_pointer_cast<arrow_array_t>(primary_key_column);
                for (size_t i = 0; i < row_num; ++i) {
                  vids.emplace_back(indexer.get_index(casted_array->Value(i)));
                }
              } else {
                if (primary_key_column->type()->Equals(arrow::utf8())) {
                  auto casted_array =
                      std::static_pointer_cast<arrow::StringArray>(
                          primary_key_column);
                  for (size_t i = 0; i < row_num; ++i) {
                    auto str = casted_array->GetView(i);
                    std::string_view str_view(str.data(), str.size());
                    vids.emplace_back(indexer.get_index(str_view));
                  }
                } else if (primary_key_column->type()->Equals(
                               arrow::large_utf8())) {
                  auto casted_array =
                      std::static_pointer_cast<arrow::LargeStringArray>(
                          primary_key_column);
                  for (size_t i = 0; i < row_num; ++i) {
                    auto str = casted_array->GetView(i);
                    std::string_view str_view(str.data(), str.size());
                    vids.emplace_back(indexer.get_index(str_view));
                  }
                }
              }
              other_columns_array.erase(other_columns_array.begin() +
                                        primary_key_ind);

              for (size_t j = 0; j < other_columns_array.size(); ++j) {
                auto array = other_columns_array[j];
                auto chunked_array =
                    std::make_shared<arrow::ChunkedArray>(array);
                set_properties_column(vtable.column_ptrs()[j], chunked_array,
                                      vids);
              }
            }
          },
          i);
    }
    for (auto& t : work_threads) {
      t.join();
    }

    auto& v_data = basic_fragment_loader_.GetVertexTable(v_label_id);
    auto label_name = schema_.get_vertex_label_name(v_label_id);

    v_data.resize(indexer.size());
    v_data.dump(vertex_table_prefix(label_name),
                snapshot_dir(basic_fragment_loader_.work_dir(), 0));

    VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label "
             << v_label_name;
  }