in flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h [772:1031]
void addEdgesRecordBatchImplHelper(
label_t src_label_id, label_t dst_label_id, label_t e_label_id,
const std::vector<std::string>& e_files,
std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
label_t, label_t, label_t, const std::string&, const LoadingConfig&,
int)>
supplier_creator) {
auto src_label_name = schema_.get_vertex_label_name(src_label_id);
auto dst_label_name = schema_.get_vertex_label_name(dst_label_id);
auto edge_label_name = schema_.get_edge_label_name(e_label_id);
auto edge_column_mappings = loading_config_.GetEdgeColumnMappings(
src_label_id, dst_label_id, e_label_id);
auto src_dst_col_pair = loading_config_.GetEdgeSrcDstCol(
src_label_id, dst_label_id, e_label_id);
if (src_dst_col_pair.first.size() != 1 ||
src_dst_col_pair.second.size() != 1) {
LOG(FATAL) << "We currently only support one src primary key and one "
"dst primary key";
}
size_t src_col_ind = src_dst_col_pair.first[0].second;
size_t dst_col_ind = src_dst_col_pair.second[0].second;
CHECK(src_col_ind != dst_col_ind);
check_edge_invariant(schema_, edge_column_mappings, src_col_ind,
dst_col_ind, src_label_id, dst_label_id, e_label_id);
const auto& src_indexer = basic_fragment_loader_.GetLFIndexer(src_label_id);
const auto& dst_indexer = basic_fragment_loader_.GetLFIndexer(dst_label_id);
std::vector<VECTOR_T> parsed_edges_vec(std::thread::hardware_concurrency());
if constexpr (std::is_same_v<
VECTOR_T,
mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>> ||
std::is_same_v<
VECTOR_T,
mmap_vector<std::tuple<vid_t, vid_t, size_t>>>) {
const auto& work_dir = basic_fragment_loader_.work_dir();
for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
parsed_edges_vec[i].open(runtime_dir(work_dir) + "/" + src_label_name +
"_" + dst_label_name + "_" + edge_label_name +
"_" + std::to_string(i) + ".tmp");
parsed_edges_vec[i].reserve(4096);
}
}
std::vector<std::atomic<int32_t>> ie_degree(dst_indexer.size()),
oe_degree(src_indexer.size());
for (size_t idx = 0; idx < ie_degree.size(); ++idx) {
ie_degree[idx].store(0);
}
for (size_t idx = 0; idx < oe_degree.size(); ++idx) {
oe_degree[idx].store(0);
}
VLOG(10) << "src indexer size: " << src_indexer.size()
<< " dst indexer size: " << dst_indexer.size();
grape::BlockingQueue<std::shared_ptr<arrow::RecordBatch>> queue;
queue.SetLimit(1024);
std::vector<std::thread> work_threads;
std::vector<std::vector<std::shared_ptr<arrow::Array>>> string_columns(
std::thread::hardware_concurrency());
if constexpr (std::is_same<EDATA_T, RecordView>::value) {
basic_fragment_loader_.init_edge_table(src_label_id, dst_label_id,
e_label_id);
}
// use a dummy vector to store the string columns, to avoid the
// strings being released as record batch is released.
std::vector<std::shared_ptr<arrow::Array>> string_cols;
std::atomic<size_t> offset(0);
std::shared_mutex rw_mutex;
for (auto filename : e_files) {
auto record_batch_supplier_vec =
supplier_creator(src_label_id, dst_label_id, e_label_id, filename,
loading_config_, parsed_edges_vec.size());
queue.SetProducerNum(record_batch_supplier_vec.size());
for (size_t i = 0; i < record_batch_supplier_vec.size(); ++i) {
work_threads.emplace_back(
[&](int idx) {
auto& string_column = string_columns[idx];
bool first_batch = true;
auto& record_batch_supplier = record_batch_supplier_vec[idx];
while (true) {
auto record_batch = record_batch_supplier->GetNextBatch();
if (!record_batch) {
queue.DecProducerNum();
break;
}
if (first_batch) {
auto header = record_batch->schema()->field_names();
auto schema_column_names = schema_.get_edge_property_names(
src_label_id, dst_label_id, e_label_id);
auto schema_column_types = schema_.get_edge_properties(
src_label_id, dst_label_id, e_label_id);
CHECK(schema_column_names.size() + 2 == header.size())
<< "schema size: " << schema_column_names.size()
<< " neq header size: " << header.size();
first_batch = false;
}
for (auto i = 0; i < record_batch->num_columns(); ++i) {
if (record_batch->column(i)->type()->Equals(arrow::utf8()) ||
record_batch->column(i)->type()->Equals(
arrow::large_utf8())) {
string_column.emplace_back(record_batch->column(i));
}
}
queue.Put(record_batch);
}
},
i);
}
for (size_t i = 0;
i <
std::min(static_cast<unsigned>(8 * record_batch_supplier_vec.size()),
std::thread::hardware_concurrency());
++i) {
work_threads.emplace_back(
[&](int idx) {
// copy the table to csr.
auto& parsed_edges = parsed_edges_vec[idx];
while (true) {
std::shared_ptr<arrow::RecordBatch> record_batch{nullptr};
auto ret = queue.Get(record_batch);
if (!ret) {
break;
}
if (!record_batch) {
LOG(FATAL) << "get nullptr batch";
}
auto columns = record_batch->columns();
// We assume the src_col and dst_col will always be put
// at front.
CHECK(columns.size() >= 2);
auto src_col = columns[0];
auto dst_col = columns[1];
auto src_col_type = src_col->type();
auto dst_col_type = dst_col->type();
CHECK(check_primary_key_type(src_col_type))
<< "unsupported src_col type: " << src_col_type->ToString();
CHECK(check_primary_key_type(dst_col_type))
<< "unsupported dst_col type: " << dst_col_type->ToString();
std::vector<std::shared_ptr<arrow::Array>> property_cols;
for (size_t i = 2; i < columns.size(); ++i) {
property_cols.emplace_back(columns[i]);
}
size_t offset_i = 0;
if constexpr (std::is_same<EDATA_T, RecordView>::value) {
auto casted_csr = dynamic_cast<DualCsr<RecordView>*>(
basic_fragment_loader_.get_csr(src_label_id, dst_label_id,
e_label_id));
CHECK(casted_csr != NULL);
auto table = casted_csr->GetTable();
CHECK(table.col_num() == property_cols.size());
offset_i = offset.fetch_add(src_col->length());
std::vector<size_t> offsets;
for (size_t _i = 0;
_i < static_cast<size_t>(src_col->length()); ++_i) {
offsets.emplace_back(offset_i + _i);
}
size_t row_num = std::max(table.row_num(), 1ul);
while (row_num < offset_i + src_col->length()) {
row_num *= 2;
}
if (row_num > table.row_num()) {
std::unique_lock<std::shared_mutex> lock(rw_mutex);
if (row_num > table.row_num()) {
table.resize(row_num);
}
}
{
std::shared_lock<std::shared_mutex> lock(rw_mutex);
for (size_t i = 0; i < table.col_num(); ++i) {
auto col = table.get_column_by_id(i);
auto chunked_array =
std::make_shared<arrow::ChunkedArray>(
property_cols[i]);
set_properties_column(col.get(), chunked_array, offsets);
}
}
}
auto edge_property = schema_.get_edge_property(
src_label_id, dst_label_id, e_label_id);
// add edges to vector
CHECK(src_col->length() == dst_col->length());
if (src_col_type->Equals(arrow::int64())) {
_append_edges<int64_t, EDATA_T, VECTOR_T>(
src_col, dst_col, src_indexer, dst_indexer,
property_cols[0], edge_property, parsed_edges, ie_degree,
oe_degree, offset_i);
} else if (src_col_type->Equals(arrow::uint64())) {
_append_edges<uint64_t, EDATA_T, VECTOR_T>(
src_col, dst_col, src_indexer, dst_indexer,
property_cols[0], edge_property, parsed_edges, ie_degree,
oe_degree, offset_i);
} else if (src_col_type->Equals(arrow::int32())) {
_append_edges<int32_t, EDATA_T, VECTOR_T>(
src_col, dst_col, src_indexer, dst_indexer,
property_cols[0], edge_property, parsed_edges, ie_degree,
oe_degree, offset_i);
} else if (src_col_type->Equals(arrow::uint32())) {
_append_edges<uint32_t, EDATA_T, VECTOR_T>(
src_col, dst_col, src_indexer, dst_indexer,
property_cols[0], edge_property, parsed_edges, ie_degree,
oe_degree, offset_i);
} else {
// must be string
_append_edges<std::string_view, EDATA_T, VECTOR_T>(
src_col, dst_col, src_indexer, dst_indexer,
property_cols[0], edge_property, parsed_edges, ie_degree,
oe_degree, offset_i);
}
}
},
i);
}
for (auto& t : work_threads) {
t.join();
}
VLOG(10) << "Finish parsing edge file:" << filename << " for label "
<< src_label_name << " -> " << dst_label_name << " -> "
<< edge_label_name;
}
VLOG(10) << "Finish parsing edge file:" << e_files.size() << " for label "
<< src_label_name << " -> " << dst_label_name << " -> "
<< edge_label_name;
std::vector<int32_t> ie_deg(ie_degree.size());
std::vector<int32_t> oe_deg(oe_degree.size());
for (size_t idx = 0; idx < ie_deg.size(); ++idx) {
ie_deg[idx] = ie_degree[idx];
}
for (size_t idx = 0; idx < oe_deg.size(); ++idx) {
oe_deg[idx] = oe_degree[idx];
}
basic_fragment_loader_.PutEdges<EDATA_T, VECTOR_T>(
src_label_id, dst_label_id, e_label_id, parsed_edges_vec, ie_deg,
oe_deg, build_csr_in_mem_);
string_columns.clear();
size_t sum = 0;
for (auto& edges : parsed_edges_vec) {
sum += edges.size();
if constexpr (
std::is_same<VECTOR_T,
mmap_vector<std::tuple<vid_t, vid_t, EDATA_T>>>::value ||
std::is_same<VECTOR_T,
mmap_vector<std::tuple<vid_t, vid_t, size_t>>>::value) {
edges.unlink();
}
}
VLOG(10) << "Finish putting: " << sum << " edges";
}