in flex/storages/rt_mutable_graph/loading_config.cc [323:500]
static Status parse_edge_files(
YAML::Node node, const Schema& schema, const std::string& scheme,
const std::string& data_location,
std::unordered_map<
std::tuple<label_t, label_t, label_t>, std::vector<std::string>,
boost::hash<std::tuple<label_t, label_t, label_t>>>& files,
std::unordered_map<
typename LoadingConfig::edge_triplet_type,
std::vector<std::tuple<size_t, std::string, std::string>>,
boost::hash<typename LoadingConfig::edge_triplet_type>>& edge_mapping,
std::unordered_map<typename LoadingConfig::edge_triplet_type,
std::pair<std::vector<std::pair<std::string, size_t>>,
std::vector<std::pair<std::string, size_t>>>,
boost::hash<typename LoadingConfig::edge_triplet_type>>&
edge_src_dst_col) {
if (!node["type_triplet"]) {
LOG(ERROR) << "edge [type_triplet] is not set properly";
return Status(StatusCode::INVALID_IMPORT_FILE,
"edge [type_triplet] is not set properly");
}
auto triplet_node = node["type_triplet"];
std::string src_label, dst_label, edge_label;
if (!get_scalar(triplet_node, "edge", edge_label)) {
std::stringstream ss;
ss << "Field [edge] is not set for edge [" << triplet_node << "]";
auto err_str = ss.str();
LOG(ERROR) << err_str;
return Status(StatusCode::INVALID_IMPORT_FILE, err_str);
}
if (!get_scalar(triplet_node, "source_vertex", src_label)) {
LOG(ERROR) << "Field [source_vertex] is not set for edge [" << edge_label
<< "]";
return Status(
StatusCode::INVALID_IMPORT_FILE,
"Field [source_vertex] is not set for edge [" + edge_label + "]");
}
if (!get_scalar(triplet_node, "destination_vertex", dst_label)) {
LOG(ERROR) << "Field [destination_vertex] is not set for edge ["
<< edge_label << "]";
return Status(
StatusCode::INVALID_IMPORT_FILE,
"Field [destination_vertex] is not set for edge [" + edge_label + "]");
}
{
// check whether src_label, dst_label and edge_label exist in schema
if (!schema.has_vertex_label(src_label)) {
LOG(ERROR) << "Vertex label [" << src_label << "] does not exist in "
<< "the schema";
return Status(StatusCode::INVALID_IMPORT_FILE, "Vertex label [" +
src_label +
"] does not exist in "
"the schema");
}
if (!schema.has_vertex_label(dst_label)) {
LOG(ERROR) << "Vertex label [" << dst_label << "] does not exist in "
<< "the schema";
return Status(StatusCode::INVALID_IMPORT_FILE, "Vertex label [" +
dst_label +
"] does not exist in "
"the schema");
}
if (!schema.has_edge_label(src_label, dst_label, edge_label)) {
LOG(ERROR) << "Edge label [" << edge_label << "] does not exist in "
<< "the schema";
return Status(StatusCode::INVALID_IMPORT_FILE, "Edge label [" +
edge_label +
"] does not exist in "
"the schema");
}
}
auto src_label_id = schema.get_vertex_label_id(src_label);
auto dst_label_id = schema.get_vertex_label_id(dst_label);
auto edge_label_id = schema.get_edge_label_id(edge_label);
if (files.find(std::make_tuple(src_label_id, dst_label_id, edge_label_id)) !=
files.end()) {
LOG(ERROR) << "Edge [" << edge_label << "] between [" << src_label
<< "] and "
<< "[" << dst_label << "] loading config already exists";
return Status(StatusCode::INVALID_IMPORT_FILE,
"Edge [" + edge_label + "] between [" + src_label + "] and " +
"[" + dst_label + "] loading config already exists");
}
// parse the vertex mapping. currently we only need one column to identify the
// vertex.
{
std::vector<std::pair<std::string, size_t>> src_columns, dst_columns;
if (!fetch_src_dst_column_mapping(schema, node, src_label_id,
"source_vertex_mappings", src_columns)) {
LOG(WARNING) << "Field [source_vertex_mappings] is not set for edge ["
<< src_label << "->[" << edge_label << "]->" << dst_label
<< "], using default choice: column_id 0";
src_columns.emplace_back("", 0);
}
if (!fetch_src_dst_column_mapping(schema, node, dst_label_id,
"destination_vertex_mappings",
dst_columns)) {
LOG(WARNING) << "Field [destination_vertex_mappings] is not set for edge["
<< src_label << "->[" << edge_label << "]->" << dst_label
<< "], using default choice: column_id 1";
dst_columns.emplace_back("", 1);
}
VLOG(10) << "src: " << src_label << ", dst: " << dst_label
<< " src_column size: " << src_columns.size() << " dst_column "
<< dst_columns.size();
edge_src_dst_col[std::tuple{src_label_id, dst_label_id, edge_label_id}] =
std::pair{src_columns, dst_columns};
}
if (node["column_mappings"]) {
auto column_mappings = node["column_mappings"];
if (!parse_column_mappings(
column_mappings, schema, edge_label,
edge_mapping[std::tuple{src_label_id, dst_label_id, edge_label_id}],
[&](const std::string& edge_label_name,
const std::string& property_name) {
return schema.edge_has_property(src_label, dst_label,
edge_label_name, property_name);
})) {
LOG(ERROR) << "Failed to parse edge mapping";
return Status(StatusCode::INVALID_IMPORT_FILE,
"Failed to parse edge mapping");
}
VLOG(10) << "Successfully parsed edge mapping size: "
<< edge_mapping.size();
} else {
VLOG(10) << "No edge column mapping is given, use default mapping";
// use default mapping
edge_mapping.emplace(
std::tuple{src_label_id, dst_label_id, edge_label_id},
std::vector<std::tuple<size_t, std::string, std::string>>{});
}
YAML::Node files_node = node["inputs"];
if (files_node) {
if (!files_node.IsSequence()) {
LOG(ERROR) << "files is not sequence";
return Status(StatusCode::INVALID_IMPORT_FILE, "files is not sequence");
}
int num = files_node.size();
for (int i = 0; i < num; ++i) {
std::string file_path = files_node[i].as<std::string>();
if (file_path.empty()) {
LOG(ERROR) << "file path is empty";
return Status(StatusCode::INVALID_IMPORT_FILE,
"The input for edge [" + edge_label + "] is empty");
}
if (scheme == "file") {
if (!access_file(data_location, file_path)) {
LOG(ERROR) << "edge file - [" << file_path << "] file not found...";
return Status(StatusCode::INVALID_IMPORT_FILE,
"edge file - [" + file_path + "] file not found...");
}
std::filesystem::path path(file_path);
VLOG(10) << "src " << src_label << " dst " << dst_label << " edge "
<< edge_label << " path " << std::filesystem::canonical(path);
files[std::tuple{src_label_id, dst_label_id, edge_label_id}]
.emplace_back(std::filesystem::canonical(path));
} else {
// append file_path to data_location
if (!data_location.empty()) {
file_path = data_location + "/" + file_path;
}
files[std::tuple{src_label_id, dst_label_id, edge_label_id}]
.emplace_back(file_path);
}
}
} else {
LOG(ERROR) << "No edge files found for edge " << edge_label << "...";
return Status(StatusCode::INVALID_IMPORT_FILE,
"No edge files found for edge " + edge_label + "...");
}
return Status::OK();
}