flex/engines/graph_db/database/graph_db_operations.cc (686 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include <unordered_map>
#include <vector>
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/database/graph_db_operations.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/service_utils.h"
#include "utils/result.h"
#include "utils/service_utils.h"
namespace gs {
bool check_primary_key_value_valid(
const rapidjson::Value& vertex_json,
const std::string& pk_filed_name = "primary_key_values") {
LOG(INFO) << "check_primary_key_value_valid for " << pk_filed_name << " "
<< gs::rapidjson_stringify(vertex_json);
if (vertex_json.HasMember(pk_filed_name)) {
if (vertex_json[pk_filed_name].IsArray()) {
return vertex_json[pk_filed_name].Size() == 1 &&
vertex_json[pk_filed_name][0].HasMember("value");
}
return true;
}
LOG(INFO) << "check_primary_key_value_valid failed";
return false;
}
Result<std::string> GraphDBOperations::CreateVertex(
GraphDBSession& session, rapidjson::Document&& input_json) {
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
// Check if the input json contains vertex_request and edge_request
if (input_json.HasMember("vertex_request") == false ||
input_json["vertex_request"].IsArray() == false ||
input_json["vertex_request"].Size() == 0 ||
(input_json.HasMember("edge_request") &&
input_json["edge_request"].IsArray() == false)) {
return Result<std::string>(gs::Status(
StatusCode::INVALID_SCHEMA,
"Invalid input json, vertex_request and edge_request should be array "
"and not empty"));
}
const Schema& schema = session.schema();
// input vertex data and edge data
try {
// vertex data
for (auto& vertex_insert : input_json["vertex_request"].GetArray()) {
vertex_data.push_back(inputVertex(vertex_insert, schema, session));
}
// edge data
for (auto& edge_insert : input_json["edge_request"].GetArray()) {
edge_data.push_back(inputEdge(edge_insert, schema, session));
}
LOG(INFO) << "CreateVertex edge_data: " << edge_data.size();
} catch (std::exception& e) {
return Result<std::string>(
gs::Status(StatusCode::INVALID_SCHEMA,
" Bad input parameter : " + std::string(e.what())));
}
auto insert_result =
insertVertex(std::move(vertex_data), std::move(edge_data), session);
if (insert_result.ok()) {
rapidjson::Document result(rapidjson::kObjectType);
result.AddMember("message", "Vertex data is successfully inserted",
result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(insert_result);
}
Result<std::string> GraphDBOperations::CreateEdge(
GraphDBSession& session, rapidjson::Document&& input_json) {
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
// Check if the input json contains edge_request
if (input_json.IsArray() == false || input_json.Size() == 0) {
return Result<std::string>(gs::Status(
StatusCode::INVALID_SCHEMA,
"Invalid input json, edge_request should be array and not empty"));
}
const Schema& schema = session.schema();
// input edge data
try {
for (auto& edge_insert : input_json.GetArray()) {
edge_data.push_back(inputEdge(edge_insert, schema, session));
}
} catch (std::exception& e) {
return Result<std::string>(
gs::Status(StatusCode::INVALID_SCHEMA,
" Bad input parameter : " + std::string(e.what())));
}
auto insert_result = insertEdge(std::move(edge_data), session);
if (insert_result.ok()) {
rapidjson::Document result(rapidjson::kObjectType);
result.AddMember("message", "Edge data is successfully inserted",
result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(insert_result);
}
Result<std::string> GraphDBOperations::UpdateVertex(
GraphDBSession& session, rapidjson::Document&& input_json) {
std::vector<VertexData> vertex_data;
const Schema& schema = session.schema();
// input vertex data
try {
if (!input_json.IsObject() || !input_json.HasMember("vertex_request")) {
return Result<std::string>(
gs::Status(StatusCode::INVALID_SCHEMA,
"Invalid input json, update vertex request should be "
"object or vertex_request field not set"));
}
if (!input_json["vertex_request"].IsArray()) {
return Result<std::string>(
gs::Status(StatusCode::INVALID_SCHEMA,
"Invalid input json, vertex_request should be array"));
}
for (auto& vertex_update : input_json["vertex_request"].GetArray()) {
vertex_data.push_back(inputVertex(vertex_update, schema, session));
}
} catch (std::exception& e) {
return Result<std::string>(
gs::Status(StatusCode::INVALID_SCHEMA,
" Bad input parameter : " + std::string(e.what())));
}
auto update_result = updateVertex(std::move(vertex_data), session);
if (update_result.ok()) {
rapidjson::Document result(rapidjson::kObjectType);
result.AddMember("message", "Successfully update Vertex",
result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(update_result);
}
Result<std::string> GraphDBOperations::UpdateEdge(
GraphDBSession& session, rapidjson::Document&& input_json) {
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
const Schema& schema = session.schema();
// input edge data
try {
if (!input_json.IsArray()) {
return Result<std::string>(
gs::Status(StatusCode::INVALID_SCHEMA,
"Invalid input json, edge_request should be array"));
}
for (auto& edge_update : input_json.GetArray()) {
edge_data.push_back(inputEdge(edge_update, schema, session));
}
} catch (std::exception& e) {
return Result<std::string>(
gs::Status(StatusCode::INVALID_SCHEMA,
" Bad input parameter : " + std::string(e.what())));
}
auto update_result = updateEdge(std::move(edge_data), session);
if (update_result.ok()) {
rapidjson::Document result(rapidjson::kObjectType);
result.AddMember("message", "Successfully update Edge",
result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(update_result);
}
Result<std::string> GraphDBOperations::GetVertex(
GraphDBSession& session,
std::unordered_map<std::string, std::string>&& params) {
rapidjson::Document result(rapidjson::kObjectType);
std::vector<VertexData> vertex_data;
std::vector<std::string> property_names;
const Schema& schema = session.schema();
// input vertex data
VertexData vertex;
std::string label = params["label"];
result.AddMember("label", label, result.GetAllocator());
vertex.pk_value = Any(std::string(params["primary_key_value"]));
auto check_result =
checkVertexSchema(schema, vertex, label, property_names, true);
if (check_result.ok() == false) {
return Result<std::string>(check_result);
}
vertex_data.push_back(vertex);
auto get_result = getVertex(std::move(vertex_data), property_names, session,
result.GetAllocator());
if (get_result.ok()) {
result.AddMember("values", get_result.value(), result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(get_result.status());
}
Result<std::string> GraphDBOperations::GetEdge(
GraphDBSession& session,
std::unordered_map<std::string, std::string>&& params) {
rapidjson::Document result(rapidjson::kObjectType);
std::vector<VertexData> vertex_data;
std::vector<EdgeData> edge_data;
const Schema& schema = session.schema();
std::string property_name;
// input edge data
EdgeData edge;
std::string src_label = params["src_label"];
std::string dst_label = params["dst_label"];
std::string edge_label = params["edge_label"];
std::string src_pk_value = params["src_primary_key_value"];
std::string dst_pk_value = params["dst_primary_key_value"];
edge.src_pk_value = Any(src_pk_value);
edge.dst_pk_value = Any(dst_pk_value);
auto check_result = checkEdgeSchema(schema, edge, src_label, dst_label,
edge_label, property_name, true);
if (check_result.ok() == false) {
return Result<std::string>(check_result);
}
edge_data.push_back(edge);
result.AddMember("src_label", src_label, result.GetAllocator());
result.AddMember("dst_label", dst_label, result.GetAllocator());
result.AddMember("edge_label", edge_label, result.GetAllocator());
result.AddMember("src_primary_key_value", src_pk_value,
result.GetAllocator());
result.AddMember("dst_primary_key_value", dst_pk_value,
result.GetAllocator());
if (property_name.empty()) {
rapidjson::Value properties(rapidjson::kObjectType);
result.AddMember("properties", properties, result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
auto get_result = getEdge(std::move(edge_data), property_name, session,
result.GetAllocator());
if (get_result.ok()) {
result.AddMember("properties", get_result.value(), result.GetAllocator());
return Result<std::string>(rapidjson_stringify(result));
}
return Result<std::string>(get_result.status());
}
Result<std::string> GraphDBOperations::DeleteVertex(
GraphDBSession& session, rapidjson::Document&& input_json) {
// not implemented
return Result<std::string>(StatusCode::UNIMPLEMENTED,
"delete_vertex is not implemented");
}
Result<std::string> GraphDBOperations::DeleteEdge(
GraphDBSession& session, rapidjson::Document&& input_json) {
// not implemented
return Result<std::string>(StatusCode::UNIMPLEMENTED,
"delete_edge is not implemented");
}
VertexData GraphDBOperations::inputVertex(const rapidjson::Value& vertex_json,
const Schema& schema,
GraphDBSession& session) {
VertexData vertex;
std::string label = jsonToString(vertex_json["label"]);
if (!check_primary_key_value_valid(vertex_json, "primary_key_values") &&
!check_primary_key_value_valid(vertex_json, "primary_key_value")) {
throw std::runtime_error("primary_key_values/primary_key_value is invalid");
}
if (vertex_json.HasMember("primary_key_values")) {
vertex.pk_value =
Any(jsonToString(vertex_json["primary_key_values"][0]["value"]));
} else {
vertex.pk_value = Any(jsonToString(vertex_json["primary_key_value"]));
}
std::unordered_set<std::string> property_names;
std::vector<std::string> property_names_arr;
for (auto& property : vertex_json["properties"].GetArray()) {
auto name_string = jsonToString(property["name"]);
auto value_string = jsonToString(property["value"]);
if (property_names.find(name_string) != property_names.end()) {
throw std::runtime_error("property already exists in input properties: " +
name_string);
} else {
property_names.insert(name_string);
property_names_arr.push_back(name_string);
}
vertex.properties.emplace_back(value_string);
}
auto check_result =
checkVertexSchema(schema, vertex, label, property_names_arr);
if (check_result.ok() == false) {
throw std::runtime_error(check_result.error_message());
}
return vertex;
}
EdgeData GraphDBOperations::inputEdge(const rapidjson::Value& edge_json,
const Schema& schema,
GraphDBSession& session) {
EdgeData edge;
std::string src_label = jsonToString(edge_json["src_label"]);
std::string dst_label = jsonToString(edge_json["dst_label"]);
std::string edge_label = jsonToString(edge_json["edge_label"]);
if (!check_primary_key_value_valid(edge_json, "src_primary_key_values") &&
!check_primary_key_value_valid(edge_json, "src_primary_key_value")) {
throw std::runtime_error(
"src_primary_key_values/src_primary_key_value is invalid");
}
if (!check_primary_key_value_valid(edge_json, "dst_primary_key_values") &&
!check_primary_key_value_valid(edge_json, "dst_primary_key_value")) {
throw std::runtime_error("dst_primary_key_values is invalid");
}
if (edge_json.HasMember("src_primary_key_values")) {
edge.src_pk_value =
Any(jsonToString(edge_json["src_primary_key_values"][0]["value"]));
} else {
edge.src_pk_value = Any(jsonToString(edge_json["src_primary_key_value"]));
}
if (edge_json.HasMember("dst_primary_key_values")) {
edge.dst_pk_value =
Any(jsonToString(edge_json["dst_primary_key_values"][0]["value"]));
} else {
edge.dst_pk_value = Any(jsonToString(edge_json["dst_primary_key_value"]));
}
// Check that all parameters in the parameter
if (edge_json["properties"].Size() > 1) {
throw std::runtime_error(
"size should be 1(only support single property edge)");
}
std::string property_name = "";
if (edge_json["properties"].Size() == 1) {
edge.property_value =
Any(jsonToString(edge_json["properties"][0]["value"]));
property_name = edge_json["properties"][0]["name"].GetString();
}
auto check_result = checkEdgeSchema(schema, edge, src_label, dst_label,
edge_label, property_name);
if (check_result.ok() == false) {
throw std::runtime_error(check_result.error_message());
}
return edge;
}
Status GraphDBOperations::checkVertexSchema(
const Schema& schema, VertexData& vertex, const std::string& label,
std::vector<std::string>& input_property_names, bool is_get) {
try {
vertex.label_id = schema.get_vertex_label_id(label);
PropertyType colType =
std::get<0>(schema.get_vertex_primary_key(vertex.label_id)[0]);
vertex.pk_value = ConvertStringToAny(vertex.pk_value.to_string(), colType);
auto properties_type = schema.get_vertex_properties(vertex.label_id);
auto properties_name = schema.get_vertex_property_names(vertex.label_id);
if (vertex.properties.size() != properties_name.size() && is_get == false) {
throw std::runtime_error("properties size not match");
}
if (is_get) {
input_property_names = properties_name;
return Status::OK();
}
for (int col_index = 0; col_index < int(properties_name.size());
col_index++) {
if (input_property_names[col_index] != properties_name[col_index]) {
throw std::runtime_error(
"properties name not match, please check the order and name");
}
vertex.properties[col_index] = ConvertStringToAny(
vertex.properties[col_index].to_string(), properties_type[col_index]);
}
return Status::OK();
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA,
" Bad input parameter : " + std::string(e.what()));
}
}
Status GraphDBOperations::checkEdgeSchema(const Schema& schema, EdgeData& edge,
const std::string& src_label,
const std::string& dst_label,
const std::string& edge_label,
std::string& property_name,
bool is_get) {
try {
edge.src_label_id = schema.get_vertex_label_id(src_label);
edge.dst_label_id = schema.get_vertex_label_id(dst_label);
edge.edge_label_id = schema.get_edge_label_id(edge_label);
auto& result = schema.get_edge_property_names(
edge.src_label_id, edge.dst_label_id, edge.edge_label_id);
if (is_get) {
if (result.size() >= 1) {
property_name = result[0];
} else {
property_name = "";
}
} else {
// update or add
if (property_name != (result.size() >= 1 ? result[0] : "")) {
throw std::runtime_error("property name not match");
}
PropertyType colType = schema.get_edge_property(
edge.src_label_id, edge.dst_label_id, edge.edge_label_id);
edge.property_value =
ConvertStringToAny(edge.property_value.to_string(), colType);
}
edge.src_pk_value = ConvertStringToAny(
edge.src_pk_value.to_string(),
std::get<0>(schema.get_vertex_primary_key(edge.src_label_id)[0]));
edge.dst_pk_value = ConvertStringToAny(
edge.dst_pk_value.to_string(),
std::get<0>(schema.get_vertex_primary_key(edge.dst_label_id)[0]));
return Status::OK();
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA,
" Bad input parameter : " + std::string(e.what()));
}
}
Status GraphDBOperations::checkEdgeExistsWithInsert(
const std::vector<EdgeData>& edge_data, GraphDBSession& session) {
try {
auto txn = session.GetReadTransaction();
for (auto& edge : edge_data) {
vid_t src_vid, dst_vid;
if (txn.GetVertexIndex(edge.src_label_id, edge.src_pk_value, src_vid) ==
false ||
txn.GetVertexIndex(edge.dst_label_id, edge.dst_pk_value, dst_vid) ==
false) {
// It could be that this point is about to be inserted
continue;
}
// If the edge already exists, just report the error
for (auto edgeIt =
txn.GetOutEdgeIterator(edge.src_label_id, src_vid,
edge.dst_label_id, edge.edge_label_id);
edgeIt.IsValid(); edgeIt.Next()) {
if (edgeIt.GetNeighbor() == dst_vid) {
txn.Abort();
throw std::runtime_error("Fail to create edge: Edge already exists");
}
}
}
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA, e.what());
}
return Status::OK();
}
Status GraphDBOperations::checkEdgeExists(
const std::vector<EdgeData>& edge_data, GraphDBSession& session) {
try {
auto txn = session.GetReadTransaction();
for (auto& edge : edge_data) {
vid_t src_vid, dst_vid;
if (txn.GetVertexIndex(edge.src_label_id, edge.src_pk_value, src_vid) ==
false ||
txn.GetVertexIndex(edge.dst_label_id, edge.dst_pk_value, dst_vid) ==
false) {
txn.Abort();
throw std::runtime_error("Vertex not exists");
}
// If the edge already exists, just report the error
for (auto edgeIt =
txn.GetOutEdgeIterator(edge.src_label_id, src_vid,
edge.dst_label_id, edge.edge_label_id);
edgeIt.IsValid(); edgeIt.Next()) {
if (edgeIt.GetNeighbor() == dst_vid) {
txn.Abort();
throw std::runtime_error("Fail to create edge: Edge already exists");
}
}
}
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA, e.what());
}
return Status::OK();
}
Status GraphDBOperations::checkVertexExists(
const std::vector<VertexData>& vertex_data, GraphDBSession& session) {
try {
auto txn = session.GetReadTransaction();
for (auto& vertex : vertex_data) {
vid_t vid;
if (txn.GetVertexIndex(vertex.label_id, vertex.pk_value, vid)) {
txn.Abort();
throw std::runtime_error(
"Fail to create vertex: Vertex already exists");
}
}
txn.Commit();
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA, e.what());
}
return Status::OK();
}
Status GraphDBOperations::singleInsertVertex(
std::vector<VertexData>&& vertex_data, std::vector<EdgeData>&& edge_data,
GraphDBSession& session) {
try {
auto txnWrite = session.GetSingleVertexInsertTransaction();
for (auto& vertex : vertex_data) {
if (txnWrite.AddVertex(vertex.label_id, vertex.pk_value,
vertex.properties) == false) {
txnWrite.Abort();
throw std::runtime_error(
"Fail to create vertex; All inserts are rollbacked");
}
}
for (auto& edge : edge_data) {
if (txnWrite.AddEdge(edge.src_label_id, edge.src_pk_value,
edge.dst_label_id, edge.dst_pk_value,
edge.edge_label_id, edge.property_value) == false) {
txnWrite.Abort();
throw std::runtime_error(
"Fail to create edge; All inserts are rollbacked");
}
}
txnWrite.Commit();
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA, e.what());
}
return Status::OK();
}
Status GraphDBOperations::multiInsert(std::vector<VertexData>&& vertex_data,
std::vector<EdgeData>&& edge_data,
GraphDBSession& session) {
try {
auto txnWrite = session.GetInsertTransaction();
for (auto& vertex : vertex_data) {
if (txnWrite.AddVertex(vertex.label_id, vertex.pk_value,
vertex.properties) == false) {
txnWrite.Abort();
throw std::runtime_error(
"Fail to create vertex; All inserts are rollbacked");
}
}
for (auto& edge : edge_data) {
if (txnWrite.AddEdge(edge.src_label_id, edge.src_pk_value,
edge.dst_label_id, edge.dst_pk_value,
edge.edge_label_id, edge.property_value) == false) {
txnWrite.Abort();
throw std::runtime_error(
"Fail to create edge; All inserts are rollbacked");
}
}
txnWrite.Commit();
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA, e.what());
}
return Status::OK();
}
Status GraphDBOperations::insertVertex(std::vector<VertexData>&& vertex_data,
std::vector<EdgeData>&& edge_data,
GraphDBSession& session) {
auto check_result = checkVertexExists(vertex_data, session);
if (check_result.ok() == false) {
return check_result;
}
check_result = checkEdgeExistsWithInsert(edge_data, session);
if (check_result.ok() == false) {
return check_result;
}
if (vertex_data.size() == 1) {
return singleInsertVertex(std::move(vertex_data), std::move(edge_data),
session);
} else {
return multiInsert(std::move(vertex_data), std::move(edge_data), session);
}
}
Status GraphDBOperations::singleInsertEdge(std::vector<EdgeData>&& edge_data,
GraphDBSession& session) {
try {
auto txnWrite = session.GetSingleEdgeInsertTransaction();
for (auto& edge : edge_data) {
if (txnWrite.AddEdge(edge.src_label_id, edge.src_pk_value,
edge.dst_label_id, edge.dst_pk_value,
edge.edge_label_id, edge.property_value) == false) {
txnWrite.Abort();
throw std::runtime_error(
"Fail to create edge; All inserts are rollbacked");
}
}
txnWrite.Commit();
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA, e.what());
}
return Status::OK();
}
Status GraphDBOperations::insertEdge(std::vector<EdgeData>&& edge_data,
GraphDBSession& session) {
auto check_result = checkEdgeExists(edge_data, session);
if (check_result.ok() == false) {
return check_result;
}
if (edge_data.size() == 1) {
return singleInsertEdge(std::move(edge_data), session);
} else {
return multiInsert(std::vector<VertexData>(), std::move(edge_data),
session);
}
}
Status GraphDBOperations::updateVertex(std::vector<VertexData>&& vertex_data,
GraphDBSession& session) {
try {
const auto& vertex = vertex_data[0];
auto txnRead = session.GetReadTransaction();
vid_t vertex_lid;
if (txnRead.GetVertexIndex(vertex.label_id, vertex.pk_value, vertex_lid) ==
false) {
txnRead.Abort();
throw std::runtime_error("Vertex not exists");
}
txnRead.Commit();
auto txnWrite = session.GetUpdateTransaction();
for (int i = 0; i < int(vertex.properties.size()); i++) {
if (txnWrite.SetVertexField(vertex.label_id, vertex_lid, i,
vertex.properties[i]) == false) {
txnWrite.Abort();
throw std::runtime_error("Fail to update vertex");
}
}
txnWrite.Commit();
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA, e.what());
}
return Status::OK();
}
Status GraphDBOperations::updateEdge(std::vector<EdgeData>&& edge_data,
GraphDBSession& session) {
try {
const auto& edge = edge_data[0];
auto txn = session.GetReadTransaction();
vid_t src_vid, dst_vid;
if (txn.GetVertexIndex(edge.src_label_id, edge.src_pk_value, src_vid) ==
false ||
txn.GetVertexIndex(edge.dst_label_id, edge.dst_pk_value, dst_vid) ==
false) {
txn.Abort();
throw std::runtime_error("Vertex not found");
}
bool edge_exists = false;
for (auto edgeIt = txn.GetOutEdgeIterator(
edge.src_label_id, src_vid, edge.dst_label_id, edge.edge_label_id);
edgeIt.IsValid(); edgeIt.Next()) {
if (edgeIt.GetNeighbor() == dst_vid) {
edge_exists = true;
break;
}
}
if (!edge_exists) {
txn.Abort();
throw std::runtime_error("Edge not found");
}
txn.Commit();
auto txn2 = session.GetUpdateTransaction();
txn2.SetEdgeData(true, edge.src_label_id, src_vid, edge.dst_label_id,
dst_vid, edge.edge_label_id, edge.property_value);
txn2.Commit();
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA, e.what());
}
return Status::OK();
}
Result<rapidjson::Value> GraphDBOperations::getVertex(
std::vector<VertexData>&& vertex_data,
const std::vector<std::string>& property_names, GraphDBSession& session,
rapidjson::Document::AllocatorType& allocator) {
try {
auto& vertex = vertex_data[0];
rapidjson::Document result(rapidjson::kArrayType, &allocator);
auto txn = session.GetReadTransaction();
auto vertex_db = txn.FindVertex(vertex.label_id, vertex.pk_value);
if (vertex_db.IsValid() == false) {
txn.Abort();
throw std::runtime_error("Vertex not found");
}
for (int i = 0; i < vertex_db.FieldNum(); i++) {
rapidjson::Document values(rapidjson::kObjectType, &allocator);
values.AddMember("name", property_names[i], allocator);
values.AddMember("value", vertex_db.GetField(i).to_string(), allocator);
result.PushBack(values, allocator);
}
txn.Commit();
return Result<rapidjson::Value>(std::move(result));
} catch (std::exception& e) {
return Result<rapidjson::Value>(
Status(StatusCode::INVALID_SCHEMA, e.what()));
}
}
Result<rapidjson::Value> GraphDBOperations::getEdge(
std::vector<EdgeData>&& edge_data, const std::string& property_name,
GraphDBSession& session, rapidjson::Document::AllocatorType& allocator) {
try {
const auto& edge = edge_data[0];
rapidjson::Document result(rapidjson::kArrayType);
auto txn = session.GetReadTransaction();
vid_t src_vid, dst_vid;
if (txn.GetVertexIndex(edge.src_label_id, edge.src_pk_value, src_vid) ==
false ||
txn.GetVertexIndex(edge.dst_label_id, edge.dst_pk_value, dst_vid) ==
false) {
txn.Abort();
throw std::runtime_error("Vertex not found");
}
for (auto edgeIt = txn.GetOutEdgeIterator(
edge.src_label_id, src_vid, edge.dst_label_id, edge.edge_label_id);
edgeIt.IsValid(); edgeIt.Next()) {
if (edgeIt.GetNeighbor() != dst_vid)
continue;
rapidjson::Document push_json(rapidjson::kObjectType, &allocator);
push_json.AddMember("name", property_name, allocator);
push_json.AddMember("value", edgeIt.GetData().to_string(), allocator);
result.PushBack(push_json, allocator);
break;
}
if (result.Empty()) {
txn.Abort();
throw std::runtime_error("Edge not found");
}
txn.Commit();
return Result<rapidjson::Value>(std::move(result));
} catch (std::exception& e) {
return Result<rapidjson::Value>(
Status(StatusCode::INVALID_SCHEMA, e.what()));
}
}
} // namespace gs