in flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc [254:450]
void AbstractArrowFragmentLoader::AddEdgesRecordBatch(
label_t src_label_i, label_t dst_label_i, label_t edge_label_i,
const std::vector<std::string>& filenames,
std::function<std::vector<std::shared_ptr<IRecordBatchSupplier>>(
label_t, label_t, label_t, const std::string&, const LoadingConfig&,
int)>
supplier_creator) {
auto src_label_name = schema_.get_vertex_label_name(src_label_i);
auto dst_label_name = schema_.get_vertex_label_name(dst_label_i);
auto edge_label_name = schema_.get_edge_label_name(edge_label_i);
if (filenames.size() <= 0) {
LOG(FATAL) << "No edge files found for src label: " << src_label_name
<< " dst label: " << dst_label_name
<< " edge label: " << edge_label_name;
}
if (filenames.size() <= 0) {
LOG(FATAL) << "No edge files found for src label: " << src_label_name
<< " dst label: " << dst_label_name
<< " edge label: " << edge_label_name;
}
VLOG(10) << "Init edges src label: " << src_label_name
<< " dst label: " << dst_label_name
<< " edge label: " << edge_label_name
<< " filenames: " << filenames.size();
auto& property_types = schema_.get_edge_properties(
src_label_name, dst_label_name, edge_label_name);
size_t col_num = property_types.size();
EdgeStrategy oe_strategy = schema_.get_outgoing_edge_strategy(
src_label_name, dst_label_name, edge_label_name);
EdgeStrategy ie_strategy = schema_.get_incoming_edge_strategy(
src_label_name, dst_label_name, edge_label_name);
bool oe_mutable = schema_.outgoing_edge_mutable(
src_label_name, dst_label_name, edge_label_name);
bool ie_mutable = schema_.incoming_edge_mutable(
src_label_name, dst_label_name, edge_label_name);
if (col_num == 0) {
auto dual_csr = new DualCsr<grape::EmptyType>(oe_strategy, ie_strategy,
oe_mutable, ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<grape::EmptyType>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<grape::EmptyType>(
src_label_i, dst_label_i, edge_label_i, filenames, supplier_creator);
}
} else if (col_num == 1) {
if (property_types[0] == PropertyType::kBool) {
auto dual_csr =
new DualCsr<bool>(oe_strategy, ie_strategy, oe_mutable, ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<bool>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<bool>(src_label_i, dst_label_i, edge_label_i,
filenames, supplier_creator);
}
} else if (property_types[0] == PropertyType::kDate) {
auto dual_csr =
new DualCsr<Date>(oe_strategy, ie_strategy, oe_mutable, ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<Date>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<Date>(src_label_i, dst_label_i, edge_label_i,
filenames, supplier_creator);
}
} else if (property_types[0] == PropertyType::kInt32) {
auto dual_csr = new DualCsr<int32_t>(oe_strategy, ie_strategy, oe_mutable,
ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<int32_t>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<int32_t>(src_label_i, dst_label_i, edge_label_i,
filenames, supplier_creator);
}
} else if (property_types[0] == PropertyType::kUInt32) {
auto dual_csr = new DualCsr<uint32_t>(oe_strategy, ie_strategy,
oe_mutable, ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<uint32_t>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<uint32_t>(src_label_i, dst_label_i,
edge_label_i, filenames,
supplier_creator);
}
} else if (property_types[0] == PropertyType::kInt64) {
auto dual_csr = new DualCsr<int64_t>(oe_strategy, ie_strategy, oe_mutable,
ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<int64_t>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<int64_t>(src_label_i, dst_label_i, edge_label_i,
filenames, supplier_creator);
}
} else if (property_types[0] == PropertyType::kUInt64) {
auto dual_csr = new DualCsr<uint64_t>(oe_strategy, ie_strategy,
oe_mutable, ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<uint64_t>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<uint64_t>(src_label_i, dst_label_i,
edge_label_i, filenames,
supplier_creator);
}
} else if (property_types[0] == PropertyType::kDouble) {
auto dual_csr =
new DualCsr<double>(oe_strategy, ie_strategy, oe_mutable, ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<double>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<double>(src_label_i, dst_label_i, edge_label_i,
filenames, supplier_creator);
}
} else if (property_types[0] == PropertyType::kFloat) {
auto dual_csr =
new DualCsr<float>(oe_strategy, ie_strategy, oe_mutable, ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<float>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<float>(src_label_i, dst_label_i, edge_label_i,
filenames, supplier_creator);
}
} else if (property_types[0].type_enum ==
impl::PropertyTypeImpl::kVarChar ||
property_types[0].type_enum ==
impl::PropertyTypeImpl::kStringView) {
// Both varchar and string are treated as string. For String, we use the
// default max length defined in PropertyType::GetStringDefaultMaxLength()
uint16_t max_length = PropertyType::GetStringDefaultMaxLength();
if (property_types[0].type_enum == impl::PropertyTypeImpl::kVarChar) {
max_length = property_types[0].additional_type_info.max_length;
}
auto dual_csr = new DualCsr<std::string_view>(
oe_strategy, ie_strategy, max_length, oe_mutable, ie_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
basic_fragment_loader_.AddNoPropEdgeBatch<std::string_view>(
src_label_i, dst_label_i, edge_label_i);
} else {
addEdgesRecordBatchImpl<std::string_view>(src_label_i, dst_label_i,
edge_label_i, filenames,
supplier_creator);
}
} else {
LOG(FATAL) << "Unsupported edge property type." << property_types[0];
}
} else {
const auto& props = schema_.get_edge_properties(
src_label_name, dst_label_name, edge_label_name);
const auto& prop_names = schema_.get_edge_property_names(
src_label_name, dst_label_name, edge_label_name);
auto dual_csr =
new DualCsr<RecordView>(oe_strategy, ie_strategy, prop_names, props, {},
ie_mutable, oe_mutable);
basic_fragment_loader_.set_csr(src_label_i, dst_label_i, edge_label_i,
dual_csr);
if (filenames.empty()) {
LOG(FATAL) << "No edge files found for src label: " << src_label_name
<< " dst label: " << dst_label_name
<< " edge label: " << edge_label_name;
} else {
addEdgesRecordBatchImpl<RecordView>(
src_label_i, dst_label_i, edge_label_i, filenames, supplier_creator);
}
}
}