static Status parse_edge_files()

in flex/storages/rt_mutable_graph/loading_config.cc [323:500]


static Status parse_edge_files(
    YAML::Node node, const Schema& schema, const std::string& scheme,
    const std::string& data_location,
    std::unordered_map<
        std::tuple<label_t, label_t, label_t>, std::vector<std::string>,
        boost::hash<std::tuple<label_t, label_t, label_t>>>& files,
    std::unordered_map<
        typename LoadingConfig::edge_triplet_type,
        std::vector<std::tuple<size_t, std::string, std::string>>,
        boost::hash<typename LoadingConfig::edge_triplet_type>>& edge_mapping,
    std::unordered_map<typename LoadingConfig::edge_triplet_type,
                       std::pair<std::vector<std::pair<std::string, size_t>>,
                                 std::vector<std::pair<std::string, size_t>>>,
                       boost::hash<typename LoadingConfig::edge_triplet_type>>&
        edge_src_dst_col) {
  if (!node["type_triplet"]) {
    LOG(ERROR) << "edge [type_triplet] is not set properly";
    return Status(StatusCode::INVALID_IMPORT_FILE,
                  "edge [type_triplet] is not set properly");
  }
  auto triplet_node = node["type_triplet"];
  std::string src_label, dst_label, edge_label;
  if (!get_scalar(triplet_node, "edge", edge_label)) {
    std::stringstream ss;
    ss << "Field [edge] is not set for edge [" << triplet_node << "]";
    auto err_str = ss.str();
    LOG(ERROR) << err_str;
    return Status(StatusCode::INVALID_IMPORT_FILE, err_str);
  }
  if (!get_scalar(triplet_node, "source_vertex", src_label)) {
    LOG(ERROR) << "Field [source_vertex] is not set for edge [" << edge_label
               << "]";
    return Status(
        StatusCode::INVALID_IMPORT_FILE,
        "Field [source_vertex] is not set for edge [" + edge_label + "]");
  }
  if (!get_scalar(triplet_node, "destination_vertex", dst_label)) {
    LOG(ERROR) << "Field [destination_vertex] is not set for edge ["
               << edge_label << "]";
    return Status(
        StatusCode::INVALID_IMPORT_FILE,
        "Field [destination_vertex] is not set for edge [" + edge_label + "]");
  }

  {
    // check whether src_label, dst_label and edge_label exist in schema
    if (!schema.has_vertex_label(src_label)) {
      LOG(ERROR) << "Vertex label [" << src_label << "] does not exist in "
                 << "the schema";
      return Status(StatusCode::INVALID_IMPORT_FILE, "Vertex label [" +
                                                         src_label +
                                                         "] does not exist in "
                                                         "the schema");
    }
    if (!schema.has_vertex_label(dst_label)) {
      LOG(ERROR) << "Vertex label [" << dst_label << "] does not exist in "
                 << "the schema";
      return Status(StatusCode::INVALID_IMPORT_FILE, "Vertex label [" +
                                                         dst_label +
                                                         "] does not exist in "
                                                         "the schema");
    }
    if (!schema.has_edge_label(src_label, dst_label, edge_label)) {
      LOG(ERROR) << "Edge label [" << edge_label << "] does not exist in "
                 << "the schema";
      return Status(StatusCode::INVALID_IMPORT_FILE, "Edge label [" +
                                                         edge_label +
                                                         "] does not exist in "
                                                         "the schema");
    }
  }
  auto src_label_id = schema.get_vertex_label_id(src_label);
  auto dst_label_id = schema.get_vertex_label_id(dst_label);
  auto edge_label_id = schema.get_edge_label_id(edge_label);

  if (files.find(std::make_tuple(src_label_id, dst_label_id, edge_label_id)) !=
      files.end()) {
    LOG(ERROR) << "Edge [" << edge_label << "] between [" << src_label
               << "] and "
               << "[" << dst_label << "] loading config already exists";
    return Status(StatusCode::INVALID_IMPORT_FILE,
                  "Edge [" + edge_label + "] between [" + src_label + "] and " +
                      "[" + dst_label + "] loading config already exists");
  }

  // parse the vertex mapping. currently we only need one column to identify the
  // vertex.
  {
    std::vector<std::pair<std::string, size_t>> src_columns, dst_columns;

    if (!fetch_src_dst_column_mapping(schema, node, src_label_id,
                                      "source_vertex_mappings", src_columns)) {
      LOG(WARNING) << "Field [source_vertex_mappings] is not set for edge ["
                   << src_label << "->[" << edge_label << "]->" << dst_label
                   << "], using default choice: column_id 0";
      src_columns.emplace_back("", 0);
    }
    if (!fetch_src_dst_column_mapping(schema, node, dst_label_id,
                                      "destination_vertex_mappings",
                                      dst_columns)) {
      LOG(WARNING) << "Field [destination_vertex_mappings] is not set for edge["
                   << src_label << "->[" << edge_label << "]->" << dst_label
                   << "], using default choice: column_id 1";
      dst_columns.emplace_back("", 1);
    }

    VLOG(10) << "src: " << src_label << ", dst: " << dst_label
             << " src_column size:  " << src_columns.size() << " dst_column "
             << dst_columns.size();
    edge_src_dst_col[std::tuple{src_label_id, dst_label_id, edge_label_id}] =
        std::pair{src_columns, dst_columns};
  }

  if (node["column_mappings"]) {
    auto column_mappings = node["column_mappings"];
    if (!parse_column_mappings(
            column_mappings, schema, edge_label,
            edge_mapping[std::tuple{src_label_id, dst_label_id, edge_label_id}],
            [&](const std::string& edge_label_name,
                const std::string& property_name) {
              return schema.edge_has_property(src_label, dst_label,
                                              edge_label_name, property_name);
            })) {
      LOG(ERROR) << "Failed to parse edge mapping";
      return Status(StatusCode::INVALID_IMPORT_FILE,
                    "Failed to parse edge mapping");
    }
    VLOG(10) << "Successfully parsed edge mapping size: "
             << edge_mapping.size();
  } else {
    VLOG(10) << "No edge column mapping is given, use default mapping";
    // use default mapping
    edge_mapping.emplace(
        std::tuple{src_label_id, dst_label_id, edge_label_id},
        std::vector<std::tuple<size_t, std::string, std::string>>{});
  }

  YAML::Node files_node = node["inputs"];
  if (files_node) {
    if (!files_node.IsSequence()) {
      LOG(ERROR) << "files is not sequence";
      return Status(StatusCode::INVALID_IMPORT_FILE, "files is not sequence");
    }
    int num = files_node.size();
    for (int i = 0; i < num; ++i) {
      std::string file_path = files_node[i].as<std::string>();
      if (file_path.empty()) {
        LOG(ERROR) << "file path is empty";
        return Status(StatusCode::INVALID_IMPORT_FILE,
                      "The input for edge [" + edge_label + "] is empty");
      }
      if (scheme == "file") {
        if (!access_file(data_location, file_path)) {
          LOG(ERROR) << "edge file - [" << file_path << "] file not found...";
          return Status(StatusCode::INVALID_IMPORT_FILE,
                        "edge file - [" + file_path + "] file not found...");
        }
        std::filesystem::path path(file_path);
        VLOG(10) << "src " << src_label << " dst " << dst_label << " edge "
                 << edge_label << " path " << std::filesystem::canonical(path);
        files[std::tuple{src_label_id, dst_label_id, edge_label_id}]
            .emplace_back(std::filesystem::canonical(path));
      } else {
        // append file_path to data_location
        if (!data_location.empty()) {
          file_path = data_location + "/" + file_path;
        }
        files[std::tuple{src_label_id, dst_label_id, edge_label_id}]
            .emplace_back(file_path);
      }
    }
  } else {
    LOG(ERROR) << "No edge files found for edge " << edge_label << "...";
    return Status(StatusCode::INVALID_IMPORT_FILE,
                  "No edge files found for edge " + edge_label + "...");
  }
  return Status::OK();
}