in flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h [533:703]
void addVertexRecordBatchImpl(
label_t v_label_id, const std::vector<std::string>& v_files,
std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
label_t, const std::string&, const LoadingConfig&, int)>
supplier_creator) {
std::string v_label_name = schema_.get_vertex_label_name(v_label_id);
VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label "
<< v_label_name;
auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0];
auto primary_key_name = std::get<1>(primary_key);
size_t primary_key_ind = std::get<2>(primary_key);
grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
queue.SetLimit(1024);
PTIndexerBuilder<KEY_T, vid_t> indexer_builder;
std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> batchs(
std::thread::hardware_concurrency());
std::vector<std::thread> work_threads;
for (auto& v_file : v_files) {
VLOG(10) << "Parsing vertex file:" << v_file << " for label "
<< v_label_name;
auto record_batch_supplier_vec =
supplier_creator(v_label_id, v_file, loading_config_,
std::thread::hardware_concurrency());
queue.SetProducerNum(record_batch_supplier_vec.size());
for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) {
work_threads.emplace_back(
[&](int i) {
auto& record_batch_supplier = record_batch_supplier_vec[i];
bool first_batch = true;
while (true) {
auto batch = record_batch_supplier->GetNextBatch();
if (!batch) {
queue.DecProducerNum();
break;
}
if (first_batch) {
auto header = batch->schema()->field_names();
auto schema_column_names =
schema_.get_vertex_property_names(v_label_id);
CHECK(schema_column_names.size() + 1 == header.size())
<< "File header of size: " << header.size()
<< " does not match schema column size: "
<< schema_column_names.size() + 1;
first_batch = false;
}
queue.Put(batch);
}
},
idx);
}
for (unsigned idx = 0; idx < std::thread::hardware_concurrency(); ++idx) {
work_threads.emplace_back(
[&](int i) {
while (true) {
std::shared_ptr<arrow::RecordBatch> batch{nullptr};
auto ret = queue.Get(batch);
if (!ret) {
break;
}
if (!batch) {
LOG(FATAL) << "get nullptr batch";
}
batchs[i].emplace_back(batch);
auto columns = batch->columns();
CHECK(primary_key_ind < columns.size());
auto primary_key_column = columns[primary_key_ind];
{
std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
_add_vertex<KEY_T>()(primary_key_column, indexer_builder);
}
}
},
idx);
}
for (auto& t : work_threads) {
t.join();
}
work_threads.clear();
VLOG(10) << "Finish parsing vertex file:" << v_file << " for label "
<< v_label_name;
}
basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer_builder);
const auto& indexer = basic_fragment_loader_.GetLFIndexer(v_label_id);
auto& vtable = basic_fragment_loader_.GetVertexTable(v_label_id);
size_t total_row_num = 0;
for (auto& batch : batchs) {
for (auto& b : batch) {
total_row_num += b->num_rows();
}
}
if (total_row_num > vtable.row_num()) {
std::unique_lock<std::mutex> lock(mtxs_[v_label_id]);
if (total_row_num > vtable.row_num()) {
LOG(INFO) << "Resize vertex table from " << vtable.row_num() << " to "
<< total_row_num;
vtable.resize(total_row_num);
}
}
std::atomic<size_t> cur_batch_id(0);
for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
work_threads.emplace_back(
[&](int idx) {
for (size_t id = 0; id < batchs[idx].size(); ++id) {
auto batch = batchs[idx][id];
auto columns = batch->columns();
auto other_columns_array = columns;
auto primary_key_column = columns[primary_key_ind];
size_t row_num = primary_key_column->length();
std::vector<size_t> vids;
if constexpr (!std::is_same<std::string_view, KEY_T>::value) {
using arrow_array_t =
typename gs::TypeConverter<KEY_T>::ArrowArrayType;
auto casted_array =
std::static_pointer_cast<arrow_array_t>(primary_key_column);
for (size_t i = 0; i < row_num; ++i) {
vids.emplace_back(indexer.get_index(casted_array->Value(i)));
}
} else {
if (primary_key_column->type()->Equals(arrow::utf8())) {
auto casted_array =
std::static_pointer_cast<arrow::StringArray>(
primary_key_column);
for (size_t i = 0; i < row_num; ++i) {
auto str = casted_array->GetView(i);
std::string_view str_view(str.data(), str.size());
vids.emplace_back(indexer.get_index(str_view));
}
} else if (primary_key_column->type()->Equals(
arrow::large_utf8())) {
auto casted_array =
std::static_pointer_cast<arrow::LargeStringArray>(
primary_key_column);
for (size_t i = 0; i < row_num; ++i) {
auto str = casted_array->GetView(i);
std::string_view str_view(str.data(), str.size());
vids.emplace_back(indexer.get_index(str_view));
}
}
}
other_columns_array.erase(other_columns_array.begin() +
primary_key_ind);
for (size_t j = 0; j < other_columns_array.size(); ++j) {
auto array = other_columns_array[j];
auto chunked_array =
std::make_shared<arrow::ChunkedArray>(array);
set_properties_column(vtable.column_ptrs()[j], chunked_array,
vids);
}
}
},
i);
}
for (auto& t : work_threads) {
t.join();
}
auto& v_data = basic_fragment_loader_.GetVertexTable(v_label_id);
auto label_name = schema_.get_vertex_label_name(v_label_id);
v_data.resize(indexer.size());
v_data.dump(vertex_table_prefix(label_name),
snapshot_dir(basic_fragment_loader_.work_dir(), 0));
VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label "
<< v_label_name;
}