flex/storages/metadata/graph_meta_store.cc (1,104 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <filesystem>
#include <fstream>
#include <iostream>
#include <string>
#include "flex/storages/metadata/graph_meta_store.h"
#include "flex/storages/rt_mutable_graph/schema.h"
#include "flex/utils/yaml_utils.h"
#include "property/types.h"
#include "service_utils.h"
#include <rapidjson/document.h>
#include <rapidjson/pointer.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
namespace gs {
std::string read_file_to_string(const std::string& file_path) {
if (std::filesystem::exists(file_path)) {
std::ifstream fin(file_path);
if (fin.is_open()) {
std::string line;
std::string res;
while (std::getline(fin, line)) {
res += line + "\n";
}
fin.close();
return res;
} else {
LOG(ERROR) << "Fail to open file: " << file_path;
return "";
}
} else {
LOG(ERROR) << "File not exists: " << file_path;
return "";
}
}
const std::vector<PluginMeta>& get_builtin_plugin_metas() {
static std::vector<PluginMeta> builtin_plugins;
static bool initialized = false;
if (!initialized) {
// count_vertices
PluginMeta count_vertices;
count_vertices.id = "count_vertices";
count_vertices.name = "count_vertices";
count_vertices.description = "A builtin plugin to count vertices";
count_vertices.enable = true;
count_vertices.runnable = true;
count_vertices.type = "cypher";
count_vertices.creation_time = GetCurrentTimeStamp();
count_vertices.update_time = GetCurrentTimeStamp();
count_vertices.params.push_back({"labelName", PropertyType::kString, true});
count_vertices.returns.push_back({"count", PropertyType::kInt32, false});
builtin_plugins.push_back(count_vertices);
// pagerank
PluginMeta pagerank;
pagerank.id = "pagerank";
pagerank.name = "pagerank";
pagerank.description = "A builtin plugin to calculate pagerank";
pagerank.enable = true;
pagerank.runnable = true;
pagerank.type = "cypher";
pagerank.creation_time = GetCurrentTimeStamp();
pagerank.update_time = GetCurrentTimeStamp();
pagerank.params.push_back(
{"src_vertex_label", PropertyType::kString, true});
pagerank.params.push_back(
{"dst_vertex_label", PropertyType::kString, true});
pagerank.params.push_back({"edge_label", PropertyType::kString, true});
pagerank.params.push_back({"damping_factor", PropertyType::kDouble, false});
pagerank.params.push_back({"max_iterations", PropertyType::kInt32, false});
pagerank.params.push_back({"epsilon", PropertyType::kDouble, false});
pagerank.params.push_back({"result_limit", PropertyType::kInt32, false});
pagerank.returns.push_back({"label_name", PropertyType::kString});
pagerank.returns.push_back({"vertex_oid", PropertyType::kInt64});
pagerank.returns.push_back({"pagerank", PropertyType::kDouble});
builtin_plugins.push_back(pagerank);
// k_neighbors
PluginMeta k_neighbors;
k_neighbors.id = "k_neighbors";
k_neighbors.name = "k_neighbors";
k_neighbors.description = "A builtin plugin to calculate k_neighbors";
k_neighbors.enable = true;
k_neighbors.runnable = true;
k_neighbors.type = "cypher";
k_neighbors.creation_time = GetCurrentTimeStamp();
k_neighbors.update_time = GetCurrentTimeStamp();
k_neighbors.params.push_back({"label_name", PropertyType::kString, true});
k_neighbors.params.push_back({"oid", PropertyType::kInt64, false});
k_neighbors.params.push_back({"k", PropertyType::kInt32, false});
k_neighbors.returns.push_back({"label_name", PropertyType::kString});
k_neighbors.returns.push_back({"vertex_oid", PropertyType::kInt64});
builtin_plugins.push_back(k_neighbors);
// shortest_path_among_three
PluginMeta shortest_path_among_three;
shortest_path_among_three.id = "shortest_path_among_three";
shortest_path_among_three.name = "shortest_path_among_three";
shortest_path_among_three.description =
"A builtin plugin to calculate shortest_path_among_three";
shortest_path_among_three.enable = true;
shortest_path_among_three.runnable = true;
shortest_path_among_three.type = "cypher";
shortest_path_among_three.creation_time = GetCurrentTimeStamp();
shortest_path_among_three.update_time = GetCurrentTimeStamp();
shortest_path_among_three.params.push_back(
{"label_name1", PropertyType::kString, true});
shortest_path_among_three.params.push_back(
{"oid1", PropertyType::kString, false});
shortest_path_among_three.params.push_back(
{"label_name2", PropertyType::kString, true});
shortest_path_among_three.params.push_back(
{"oid2", PropertyType::kString, false});
shortest_path_among_three.params.push_back(
{"label_name3", PropertyType::kString, true});
shortest_path_among_three.params.push_back(
{"oid3", PropertyType::kString, false});
shortest_path_among_three.returns.push_back(
{"path", PropertyType::kString});
builtin_plugins.push_back(shortest_path_among_three);
initialized = true;
}
return builtin_plugins;
}
void append_builtin_plugins(std::vector<PluginMeta>& plugin_metas) {
auto builtin_plugin_metas = get_builtin_plugin_metas();
plugin_metas.insert(plugin_metas.end(), builtin_plugin_metas.begin(),
builtin_plugin_metas.end());
}
UpdateGraphMetaRequest::UpdateGraphMetaRequest(
int64_t data_update_time, const std::string& data_import_config)
: data_update_time(data_update_time),
data_import_config(data_import_config) {}
std::string Parameter::ToJson() const {
rapidjson::Document json(rapidjson::kObjectType);
json.AddMember("name", name, json.GetAllocator());
json.AddMember("type", to_json(type, &json.GetAllocator()),
json.GetAllocator());
json.AddMember("allow_cast", allow_cast, json.GetAllocator());
return rapidjson_stringify(json);
}
void GraphMeta::ToJson(rapidjson::Value& json,
rapidjson::Document::AllocatorType& allocator) const {
json.AddMember("version", version, allocator);
json.AddMember("id", id, allocator);
json.AddMember("name", name, allocator);
json.AddMember("description", description, allocator);
json.AddMember("creation_time", creation_time, allocator);
json.AddMember("data_update_time", data_update_time, allocator);
if (!data_import_config.empty()) {
rapidjson::Document tempDoc(rapidjson::kObjectType, &allocator);
if (tempDoc.Parse(data_import_config.c_str()).HasParseError()) {
LOG(ERROR) << "Invalid data_import_config: " << data_import_config;
} else {
json.AddMember("data_import_config", tempDoc, allocator);
}
}
{
rapidjson::Document tempDoc(rapidjson::kObjectType, &allocator);
if (tempDoc.Parse(schema.c_str()).HasParseError()) {
LOG(ERROR) << "Invalid schema: " << schema;
} else {
json.AddMember("schema", tempDoc, allocator);
}
}
rapidjson::Document stored_procedures(rapidjson::kArrayType, &allocator);
for (auto& plugin_meta : plugin_metas) {
rapidjson::Document tempDoc(rapidjson::kObjectType, &allocator);
plugin_meta.ToJson(tempDoc, allocator);
stored_procedures.PushBack(tempDoc, allocator);
}
json.AddMember("stored_procedures", stored_procedures, allocator);
return;
}
std::string GraphMeta::ToJson() const {
rapidjson::Document json(rapidjson::kObjectType);
ToJson(json, json.GetAllocator());
return rapidjson_stringify(json);
}
GraphMeta GraphMeta::FromJson(const std::string& json_str) {
rapidjson::Document json(rapidjson::kObjectType);
if (json.Parse(json_str.c_str()).HasParseError()) {
LOG(ERROR) << "Invalid json string: " << json_str;
return GraphMeta();
} else {
return GraphMeta::FromJson(json);
}
}
GraphMeta GraphMeta::FromJson(const rapidjson::Value& json) {
GraphMeta meta;
if (json.HasMember("version")) {
meta.version = json["version"].GetString();
} else {
meta.version = "v0.1";
}
if (json.HasMember("id")) {
if (json["id"].IsInt()) {
meta.id = json["id"].GetInt();
} else if (json["id"].IsInt64()) {
meta.id = json["id"].GetInt64();
} else {
meta.id = json["id"].GetString();
}
}
meta.name = json["name"].GetString();
meta.description = json["description"].GetString();
meta.creation_time = json["creation_time"].GetInt64();
meta.schema = rapidjson_stringify(json["schema"]);
if (json.HasMember("data_update_time")) {
meta.data_update_time = json["data_update_time"].GetInt64();
} else {
meta.data_update_time = 0;
}
if (json.HasMember("data_import_config")) {
meta.data_import_config = rapidjson_stringify(json["data_import_config"]);
}
if (json.HasMember("stored_procedures") &&
json["stored_procedures"].IsArray()) {
for (auto& plugin : json["stored_procedures"].GetArray()) {
meta.plugin_metas.push_back(PluginMeta::FromJson(plugin));
}
}
if (json.HasMember("store_type")) {
meta.store_type = json["store_type"].GetString();
} else {
meta.store_type = "mutable_csr";
}
return meta;
}
PluginMeta PluginMeta::FromJson(const std::string& json_str) {
rapidjson::Document json(rapidjson::kObjectType);
if (json.Parse(json_str.c_str()).HasParseError()) {
LOG(ERROR) << "Invalid json string: " << json_str;
return PluginMeta();
} else {
return PluginMeta::FromJson(json);
}
}
PluginMeta PluginMeta::FromJson(const rapidjson::Value& json) {
PluginMeta meta;
if (json.HasMember("id")) {
if (json["id"].IsInt()) {
meta.id = json["id"].GetInt();
} else if (json["id"].IsInt64()) {
meta.id = json["id"].GetInt64();
} else {
meta.id = json["id"].GetString();
}
}
if (json.HasMember("name")) {
meta.name = json["name"].GetString();
if (meta.id.empty()) {
meta.id = meta.name;
}
}
if (json.HasMember("bound_graph")) {
meta.bound_graph = json["bound_graph"].GetString();
}
if (json.HasMember("description")) {
meta.description = json["description"].GetString();
}
if (json.HasMember("params")) {
meta.setParamsFromJsonString((json["params"]));
}
if (json.HasMember("returns")) {
meta.setReturnsFromJsonString(json["returns"]);
}
if (json.HasMember("library")) {
meta.library = json["library"].GetString();
}
if (json.HasMember("query")) {
meta.query = json["query"].GetString();
}
if (json.HasMember("type")) {
meta.type = json["type"].GetString();
} else {
meta.type = "cpp"; // default is cpp
}
if (json.HasMember("option")) {
meta.setOptionFromJsonString(rapidjson_stringify(json["option"]));
}
if (json.HasMember("creation_time")) {
meta.creation_time = json["creation_time"].GetInt64();
}
if (json.HasMember("update_time")) {
meta.update_time = json["update_time"].GetInt64();
}
if (json.HasMember("enable")) {
meta.enable = json["enable"].GetBool();
}
if (json.HasMember("runnable")) {
meta.runnable = json["runnable"].GetBool();
}
return meta;
}
void PluginMeta::ToJson(rapidjson::Value& json,
rapidjson::Document::AllocatorType& allocator) const {
json.AddMember("id", id, allocator);
json.AddMember("name", name, allocator);
json.AddMember("bound_graph", bound_graph, allocator);
json.AddMember("description", description, allocator);
rapidjson::Document params_json(rapidjson::kArrayType, &allocator);
for (auto& param : params) {
rapidjson::Document tempDoc(rapidjson::kObjectType, &allocator);
tempDoc.AddMember("name", param.name, allocator);
tempDoc.AddMember("type", to_json(param.type, &allocator), allocator);
tempDoc.AddMember("allow_cast", param.allow_cast, allocator);
params_json.PushBack(tempDoc, allocator);
}
json.AddMember("params", params_json, allocator);
rapidjson::Document returns_json(rapidjson::kArrayType, &allocator);
for (auto& ret : returns) {
rapidjson::Document tempDoc(rapidjson::kObjectType, &allocator);
tempDoc.AddMember("name", ret.name, allocator);
tempDoc.AddMember("type", to_json(ret.type, &allocator), allocator);
tempDoc.AddMember("allow_cast", ret.allow_cast, allocator);
returns_json.PushBack(tempDoc, allocator);
}
json.AddMember("returns", returns_json, allocator);
rapidjson::Document option_json(rapidjson::kObjectType, &allocator);
for (auto& opt : option) {
rapidjson::Value key(opt.first.c_str(), allocator);
rapidjson::Value value(opt.second.c_str(), allocator);
option_json.AddMember(key, value, allocator);
}
json.AddMember("option", option_json, allocator);
json.AddMember("creation_time", creation_time, allocator);
json.AddMember("update_time", update_time, allocator);
json.AddMember("enable", enable, allocator);
json.AddMember("runnable", runnable, allocator);
json.AddMember("library", library, allocator);
json.AddMember("query", query, allocator);
json.AddMember("type", type, allocator);
}
std::string PluginMeta::ToJson() const {
rapidjson::Document json(rapidjson::kObjectType);
ToJson(json, json.GetAllocator());
return rapidjson_stringify(json);
}
void PluginMeta::setParamsFromJsonString(const rapidjson::Value& document) {
if (document.IsArray()) {
for (auto& param : document.GetArray()) {
Parameter p;
p.name = param["name"].GetString();
p.type = from_json(param["type"]);
if (param.HasMember("allow_cast")) {
p.allow_cast = param["allow_cast"].GetBool();
} else {
p.allow_cast = false;
}
params.push_back(p);
}
} else {
LOG(ERROR) << "Invalid params string, expected array: "
<< rapidjson_stringify(document);
}
}
void PluginMeta::setReturnsFromJsonString(const rapidjson::Value& value) {
if (value.IsArray()) {
for (auto& ret : value.GetArray()) {
Parameter p;
p.name = ret["name"].GetString();
p.type = from_json(ret["type"]);
if (ret.HasMember("allow_cast")) {
p.allow_cast = ret["allow_cast"].GetBool();
} else {
p.allow_cast = false;
}
returns.push_back(p);
}
} else {
LOG(ERROR) << "Invalid returns string, expected array: "
<< rapidjson_stringify(value);
}
}
void PluginMeta::setOptionFromJsonString(const std::string& json_str) {
rapidjson::Document document(rapidjson::kObjectType);
if (document.Parse(json_str.c_str()).HasParseError()) {
LOG(ERROR) << "Invalid option string: " << json_str;
return;
}
if (document.IsObject()) {
for (auto& opt : document.GetObject()) {
option[opt.name.GetString()] = opt.value.GetString();
}
} else {
LOG(ERROR) << "Invalid option string, expected object: " << json_str;
}
}
std::string JobMeta::ToJson(bool print_log) const {
rapidjson::Document json(rapidjson::kObjectType);
json.AddMember("id", id, json.GetAllocator());
json.AddMember("status", std::to_string(status), json.GetAllocator());
json.AddMember("start_time", start_time, json.GetAllocator());
json.AddMember("end_time", end_time, json.GetAllocator());
if (print_log) {
json.AddMember("log", read_file_to_string(log_path), json.GetAllocator());
} else {
json.AddMember("log_path", log_path, json.GetAllocator());
}
json.AddMember("detail", rapidjson::kObjectType, json.GetAllocator());
json["detail"].AddMember("graph_id", graph_id, json.GetAllocator());
json["detail"].AddMember("process_id", process_id, json.GetAllocator());
json.AddMember("type", type, json.GetAllocator());
return rapidjson_stringify(json);
}
JobMeta JobMeta::FromJson(const std::string& json_str) {
rapidjson::Document json(rapidjson::kObjectType);
if (json.Parse(json_str.c_str()).HasParseError()) {
LOG(ERROR) << "Invalid json string: " << json_str;
return JobMeta();
} else {
return JobMeta::FromJson(json);
}
}
JobMeta JobMeta::FromJson(const rapidjson::Value& json) {
JobMeta meta;
if (json.HasMember("id")) {
if (json["id"].IsInt64()) {
meta.id = json["id"].GetInt64();
} else if (json["id"].IsInt()) {
meta.id = json["id"].GetInt();
} else {
meta.id = json["id"].GetString();
}
}
if (json.HasMember("detail")) {
const auto& detail = json["detail"];
if (detail.HasMember("graph_id")) {
if (detail["graph_id"].IsInt()) {
meta.graph_id = detail["graph_id"].GetInt();
} else if (detail["graph_id"].IsInt64()) {
meta.graph_id = detail["graph_id"].GetInt64();
} else {
meta.graph_id = detail["graph_id"].GetString();
}
}
if (detail.HasMember("process_id")) {
meta.process_id = detail["process_id"].GetInt();
}
}
if (json.HasMember("start_time")) {
meta.start_time = json["start_time"].GetInt64();
}
if (json.HasMember("end_time")) {
meta.end_time = json["end_time"].GetInt64();
}
if (json.HasMember("status")) {
meta.status = parseFromString(json["status"].GetString());
}
if (json.HasMember("log_path")) {
meta.log_path = json["log_path"].GetString();
VLOG(10) << "log_path: " << meta.log_path;
}
if (json.HasMember("type")) {
meta.type = json["type"].GetString();
}
return meta;
}
gs::Result<YAML::Node> preprocess_vertex_schema(YAML::Node root,
const std::string& type_name) {
// 1. To support open a empty graph, we should check if the x_csr_params is
// set for each vertex type, if not set, we set it to a rather small max_vnum,
// to avoid to much memory usage.
auto types = root[type_name];
YAML::Node new_types;
for (auto type : types) {
if (!type["x_csr_params"]) {
type["x_csr_params"]["max_vertex_num"] = 8192;
}
new_types.push_back(type);
}
root[type_name] = new_types;
return root;
}
gs::Result<YAML::Node> preprocess_vertex_edge_types(
YAML::Node root, const std::string& type_name) {
auto types = root[type_name];
int32_t cur_type_id = 0;
YAML::Node new_types;
for (auto type : types) {
if (type["type_id"]) {
auto type_id = type["type_id"].as<int32_t>();
if (type_id != cur_type_id) {
return gs::Status(gs::StatusCode::INVALID_SCHEMA,
"Invalid " + type_name +
" type_id: " + std::to_string(type_id) +
", expect: " + std::to_string(cur_type_id));
}
} else {
type["type_id"] = cur_type_id;
}
cur_type_id++;
int32_t cur_prop_id = 0;
if (type["properties"]) {
for (auto prop : type["properties"]) {
if (prop["property_id"]) {
auto prop_id = prop["property_id"].as<int32_t>();
if (prop_id != cur_prop_id) {
return gs::Status(gs::StatusCode::INVALID_SCHEMA,
"Invalid " + type_name + " property_id: " +
type["type_name"].as<std::string>() + " : " +
std::to_string(prop_id) +
", expect: " + std::to_string(cur_prop_id));
}
} else {
prop["property_id"] = cur_prop_id;
}
cur_prop_id++;
}
}
new_types.push_back(type);
}
root[type_name] = new_types;
return root;
}
// Preprocess the schema to be compatible with the current storage.
// 1. check if any property_id or type_id is set for each type, If set, then all
// vertex/edge types should all set.
// 2. If property_id or type_id is not set, then set them according to the order
gs::Result<YAML::Node> preprocess_graph_schema(YAML::Node&& node) {
if (node["schema"] && node["schema"]["vertex_types"]) {
// First check whether property_id or type_id is set in the schema
YAML::Node schema_node = node["schema"];
ASSIGN_AND_RETURN_IF_RESULT_NOT_OK(
schema_node, preprocess_vertex_edge_types(schema_node, "vertex_types"));
ASSIGN_AND_RETURN_IF_RESULT_NOT_OK(
schema_node, preprocess_vertex_schema(schema_node, "vertex_types"));
if (node["schema"]["edge_types"]) {
// edge_type could be optional.
ASSIGN_AND_RETURN_IF_RESULT_NOT_OK(
schema_node, preprocess_vertex_edge_types(schema_node, "edge_types"));
}
node["schema"] = schema_node;
return node;
} else {
return gs::Status(gs::StatusCode::INVALID_SCHEMA, "Invalid graph schema: ");
}
}
Result<std::string> preprocess_and_check_schema_json_string(
const std::string& raw_json_str) {
YAML::Node yaml;
try {
rapidjson::Document doc;
if (doc.Parse(raw_json_str).HasParseError()) {
throw std::runtime_error("Fail to parse json: " +
std::to_string(doc.GetParseError()));
}
std::stringstream json_ss;
json_ss << raw_json_str;
yaml = YAML::Load(json_ss);
} catch (std::exception& e) {
LOG(ERROR) << "Fail to parse json: " << e.what();
return gs::Result<std::string>(
gs::Status(gs::StatusCode::INVALID_SCHEMA,
"Fail to parse json: " + std::string(e.what())));
} catch (...) {
LOG(ERROR) << "Fail to parse json: " << raw_json_str;
return gs::Result<std::string>(
gs::Status(gs::StatusCode::INVALID_SCHEMA, "Fail to parse json: "));
}
// preprocess the schema yaml,
auto res_yaml = preprocess_graph_schema(std::move(yaml));
if (!res_yaml.ok()) {
return gs::Result<std::string>(res_yaml.status());
}
auto& yaml_value = res_yaml.value();
// set default value
if (!yaml_value["store_type"]) {
yaml_value["store_type"] = "mutable_csr";
}
auto parse_schema_res = gs::Schema::LoadFromYamlNode(yaml_value);
if (!parse_schema_res.ok()) {
return gs::Result<std::string>(parse_schema_res.status());
}
return gs::get_json_string_from_yaml(yaml_value);
}
Result<CreateGraphMetaRequest> CreateGraphMetaRequest::FromJson(
const std::string& json_str) {
LOG(INFO) << "CreateGraphMetaRequest::FromJson: " << json_str;
CreateGraphMetaRequest request;
rapidjson::Document json(rapidjson::kObjectType);
if (json.Parse(json_str.c_str()).HasParseError()) {
LOG(ERROR) << "CreateGraphMetaRequest::FromJson error: " << json_str;
return request;
}
if (json.HasMember("version")) {
request.version = json["version"].GetString();
} else {
request.version = "v0.1";
}
if (json.HasMember("name")) {
request.name = json["name"].GetString();
}
if (json.HasMember("description")) {
request.description = json["description"].GetString();
}
if (json.HasMember("schema")) {
request.schema = rapidjson_stringify(json["schema"]);
}
if (json.HasMember("data_update_time")) {
request.data_update_time = json["data_update_time"].GetInt64();
} else {
request.data_update_time = 0;
}
if (json.HasMember("creation_time")) {
request.creation_time = json["creation_time"].GetInt64();
} else {
request.creation_time = GetCurrentTimeStamp();
}
if (json.HasMember("stored_procedures") &&
json["stored_procedures"].IsArray()) {
for (auto& plugin : json["stored_procedures"].GetArray()) {
request.plugin_metas.push_back(PluginMeta::FromJson(plugin));
}
}
// Add builtin plugins
append_builtin_plugins(request.plugin_metas);
return request;
}
std::string CreateGraphMetaRequest::ToString() const {
rapidjson::Document json(rapidjson::kObjectType);
json.AddMember("version", version, json.GetAllocator());
json.AddMember("name", name, json.GetAllocator());
json.AddMember("description", description, json.GetAllocator());
{
rapidjson::Document schema_doc(rapidjson::kObjectType,
&json.GetAllocator());
if (schema_doc.Parse(schema.c_str()).HasParseError()) {
LOG(ERROR) << "Invalid schema: " << schema;
} else {
json.AddMember("schema", schema_doc, json.GetAllocator());
}
}
if (data_update_time.has_value()) {
json.AddMember("data_update_time", data_update_time.value(),
json.GetAllocator());
} else {
json.AddMember("data_update_time", 0, json.GetAllocator());
}
json.AddMember("creation_time", creation_time, json.GetAllocator());
rapidjson::Document stored_procedures(rapidjson::kArrayType,
&json.GetAllocator());
for (auto& plugin_meta : plugin_metas) {
rapidjson::Document tempDoc(rapidjson::kObjectType, &json.GetAllocator());
if (tempDoc.Parse(plugin_meta.ToJson().c_str()).HasParseError()) {
LOG(ERROR) << "Invalid plugin_meta: " << plugin_meta.ToJson();
} else {
stored_procedures.PushBack(tempDoc, json.GetAllocator());
}
}
json.AddMember("stored_procedures", stored_procedures, json.GetAllocator());
return rapidjson_stringify(json);
}
CreatePluginMetaRequest::CreatePluginMetaRequest() : enable(true) {}
std::string CreatePluginMetaRequest::paramsString() const {
rapidjson::Document json(rapidjson::kArrayType);
for (auto& param : params) {
rapidjson::Document param_json(rapidjson::kObjectType,
&json.GetAllocator());
param_json.AddMember("name", param.name, json.GetAllocator());
param_json.AddMember("type", to_json(param.type, &json.GetAllocator()),
json.GetAllocator());
json.PushBack(param_json, json.GetAllocator());
}
return rapidjson_stringify(json);
}
std::string CreatePluginMetaRequest::returnsString() const {
rapidjson::Document json(rapidjson::kArrayType);
for (auto& ret : returns) {
rapidjson::Document ret_json(rapidjson::kObjectType, &json.GetAllocator());
ret_json.AddMember("name", ret.name, json.GetAllocator());
ret_json.AddMember("type", to_json(ret.type, &json.GetAllocator()),
json.GetAllocator());
json.PushBack(ret_json, json.GetAllocator());
}
return rapidjson_stringify(json);
}
std::string CreatePluginMetaRequest::optionString() const {
rapidjson::Document json(rapidjson::kObjectType);
for (auto& opt : option) {
json.AddMember(rapidjson::Value(opt.first.c_str(), json.GetAllocator()),
rapidjson::Value(opt.second.c_str(), json.GetAllocator()),
json.GetAllocator());
}
return rapidjson_stringify(json);
}
std::string CreatePluginMetaRequest::ToString() const {
rapidjson::Document json(rapidjson::kObjectType);
if (id.has_value()) {
json.AddMember("id", id.value(), json.GetAllocator());
}
json.AddMember("bound_graph", bound_graph, json.GetAllocator());
json.AddMember("name", name, json.GetAllocator());
json.AddMember("creation_time", creation_time, json.GetAllocator());
json.AddMember("description", description, json.GetAllocator());
json.AddMember("params", rapidjson::kArrayType, json.GetAllocator());
for (auto& param : params) {
rapidjson::Document param_json(rapidjson::kObjectType,
&json.GetAllocator());
param_json.AddMember("name", param.name, json.GetAllocator());
param_json.AddMember("type", to_json(param.type, &json.GetAllocator()),
json.GetAllocator());
json["params"].PushBack(param_json, json.GetAllocator());
}
json.AddMember("returns", rapidjson::kArrayType, json.GetAllocator());
for (auto& ret : returns) {
rapidjson::Document ret_json(rapidjson::kObjectType, &json.GetAllocator());
ret_json.AddMember("name", ret.name, json.GetAllocator());
ret_json.AddMember("type", to_json(ret.type, &json.GetAllocator()),
json.GetAllocator());
json["returns"].PushBack(ret_json, json.GetAllocator());
}
json.AddMember("library", library, json.GetAllocator());
json.AddMember("option", rapidjson::kObjectType, json.GetAllocator());
for (auto& opt : option) {
json["option"].AddMember(
rapidjson::Value(opt.first.c_str(), json.GetAllocator()),
rapidjson::Value(opt.second.c_str(), json.GetAllocator()),
json.GetAllocator());
}
json.AddMember("query", query, json.GetAllocator());
json.AddMember("type", type, json.GetAllocator());
json.AddMember("enable", enable, json.GetAllocator());
return rapidjson_stringify(json);
}
CreatePluginMetaRequest CreatePluginMetaRequest::FromJson(
const std::string& json) {
rapidjson::Document document(rapidjson::kObjectType);
if (document.Parse(json.c_str()).HasParseError()) {
LOG(ERROR) << "CreatePluginMetaRequest::FromJson error: " << json;
return CreatePluginMetaRequest();
}
return CreatePluginMetaRequest::FromJson(document);
}
CreatePluginMetaRequest CreatePluginMetaRequest::FromJson(
const rapidjson::Value& j) {
// TODO: make sure this is correct
CreatePluginMetaRequest request;
if (j.HasMember("id")) {
if (j["id"].IsInt()) {
request.id = std::to_string(j["id"].GetInt());
} else if (j["id"].IsInt64()) {
request.id = std::to_string(j["id"].GetInt64());
} else {
request.id = j["id"].GetString();
}
}
if (j.HasMember("name")) {
request.name = j["name"].GetString();
}
if (j.HasMember("bound_graph")) {
if (j["bound_graph"].IsInt()) {
request.bound_graph = j["bound_graph"].GetInt();
} else if (j["bound_graph"].IsInt64()) {
request.bound_graph = j["bound_graph"].GetInt64();
} else {
request.bound_graph = j["bound_graph"].GetString();
}
}
if (j.HasMember("creation_time")) {
request.creation_time = j["creation_time"].GetInt64();
} else {
request.creation_time = GetCurrentTimeStamp();
}
if (j.HasMember("description")) {
request.description = j["description"].GetString();
}
if (j.HasMember("params")) {
for (auto& param : j["params"].GetArray()) {
Parameter p;
p.name = param["name"].GetString();
from_json(param["type"], p.type);
if (param.HasMember("allow_cast")) {
p.allow_cast = param["allow_cast"].GetBool();
}
request.params.push_back(p);
}
}
if (j.HasMember("returns")) {
for (auto& ret : j["returns"].GetArray()) {
Parameter p;
p.name = ret["name"].GetString();
from_json(ret["type"], p.type);
if (ret.HasMember("allow_cast")) {
p.allow_cast = ret["allow_cast"].GetBool();
}
request.returns.push_back(p);
}
}
if (j.HasMember("library")) {
request.library = j["library"].GetString();
}
if (j.HasMember("option")) {
for (auto& opt : j["option"].GetObject()) {
request.option[opt.name.GetString()] = opt.value.GetString();
}
}
if (j.HasMember("query")) {
request.query = j["query"].GetString();
}
if (j.HasMember("type")) {
request.type = j["type"].GetString();
}
if (j.HasMember("enable")) {
request.enable = j["enable"].GetBool();
}
return request;
}
UpdatePluginMetaRequest::UpdatePluginMetaRequest() : enable(true) {}
UpdatePluginMetaRequest UpdatePluginMetaRequest::FromJson(
const std::string& json) {
UpdatePluginMetaRequest request;
rapidjson::Document j(rapidjson::kObjectType);
if (j.Parse(json.c_str()).HasParseError()) {
LOG(ERROR) << "UpdatePluginMetaRequest::FromJson error: " << json;
return request;
}
if (j.HasMember("name")) {
if (j["name"].IsInt()) {
request.name = std::to_string(j["name"].GetInt());
} else if (j["name"].IsInt64()) {
request.name = std::to_string(j["name"].GetInt64());
} else {
request.name = j["name"].GetString();
}
}
if (j.HasMember("description")) {
request.description = j["description"].GetString();
}
if (j.HasMember("update_time")) {
request.update_time = j["update_time"].GetInt64();
} else {
request.update_time = GetCurrentTimeStamp();
}
if (j.HasMember("params") && j["params"].IsArray()) {
request.params = std::vector<Parameter>();
for (auto& param : j["params"].GetArray()) {
Parameter p;
p.name = param["name"].GetString();
p.type = from_json(param["type"]);
if (param.HasMember("allow_cast")) {
p.allow_cast = param["allow_cast"].GetBool();
}
request.params->emplace_back(std::move(p));
}
}
if (j.HasMember("returns") && j["returns"].IsArray()) {
request.returns = std::vector<Parameter>();
for (auto& ret : j["returns"].GetArray()) {
Parameter p;
p.name = ret["name"].GetString();
p.type = from_json(ret["type"]);
if (ret.HasMember("allow_cast")) {
p.allow_cast = ret["allow_cast"].GetBool();
}
request.returns->emplace_back(std::move(p));
}
}
if (j.HasMember("library")) {
request.library = j["library"].GetString();
}
if (j.HasMember("option")) {
request.option = std::unordered_map<std::string, std::string>();
for (auto& opt : j["option"].GetObject()) {
request.option->insert({opt.name.GetString(), opt.value.GetString()});
}
}
if (j.HasMember("enable")) {
request.enable = j["enable"].GetBool();
}
return request;
}
std::string UpdatePluginMetaRequest::paramsString() const {
rapidjson::Document json(rapidjson::kArrayType);
if (params.has_value()) {
for (auto& param : params.value()) {
rapidjson::Document param_json(rapidjson::kObjectType,
&json.GetAllocator());
param_json.AddMember("name", param.name, json.GetAllocator());
param_json.AddMember("type", to_json(param.type, &json.GetAllocator()),
json.GetAllocator());
json.PushBack(param_json, json.GetAllocator());
}
}
return rapidjson_stringify(json);
}
std::string UpdatePluginMetaRequest::returnsString() const {
rapidjson::Document json(rapidjson::kArrayType);
if (returns.has_value()) {
for (auto& ret : returns.value()) {
rapidjson::Document ret_json(rapidjson::kObjectType,
&json.GetAllocator());
ret_json.AddMember("name", ret.name, json.GetAllocator());
ret_json.AddMember("type", to_json(ret.type, &json.GetAllocator()),
json.GetAllocator());
json.PushBack(ret_json, json.GetAllocator());
}
}
return rapidjson_stringify(json);
}
std::string UpdatePluginMetaRequest::optionString() const {
rapidjson::Document json(rapidjson::kObjectType);
if (option.has_value()) {
for (auto& opt : option.value()) {
json.AddMember(rapidjson::Value(opt.first.c_str(), json.GetAllocator()),
rapidjson::Value(opt.second.c_str(), json.GetAllocator()),
json.GetAllocator());
}
}
return rapidjson_stringify(json);
}
std::string UpdatePluginMetaRequest::ToString() const {
rapidjson::Document json(rapidjson::kObjectType);
if (name.has_value()) {
json.AddMember("name", name.value(), json.GetAllocator());
}
if (bound_graph.has_value()) {
json.AddMember("bound_graph", bound_graph.value(), json.GetAllocator());
}
if (description.has_value()) {
json.AddMember("description", description.value(), json.GetAllocator());
}
if (update_time.has_value()) {
json.AddMember("update_time", update_time.value(), json.GetAllocator());
}
// create array of json objects
rapidjson::Document params_json(rapidjson::kArrayType, &json.GetAllocator());
if (params.has_value()) {
for (auto& param : params.value()) {
rapidjson::Document param_json(rapidjson::kObjectType,
&json.GetAllocator());
param_json.AddMember("name", param.name, json.GetAllocator());
param_json.AddMember("type", to_json(param.type, &json.GetAllocator()),
json.GetAllocator());
params_json.PushBack(param_json, json.GetAllocator());
}
}
json.AddMember("params", params_json, json.GetAllocator());
rapidjson::Document returns_json(rapidjson::kArrayType, &json.GetAllocator());
if (returns.has_value()) {
for (auto& ret : returns.value()) {
rapidjson::Document ret_json(rapidjson::kObjectType,
&json.GetAllocator());
ret_json.AddMember("name", ret.name, json.GetAllocator());
ret_json.AddMember("type", to_json(ret.type, &json.GetAllocator()),
json.GetAllocator());
returns_json.PushBack(ret_json, json.GetAllocator());
}
}
json.AddMember("returns", returns_json, json.GetAllocator());
if (library.has_value()) {
json.AddMember("library", library.value(), json.GetAllocator());
}
rapidjson::Document option_json(rapidjson::kObjectType, &json.GetAllocator());
if (option.has_value()) {
for (auto& opt : option.value()) {
option_json.AddMember(
rapidjson::Value(opt.first.c_str(), json.GetAllocator()),
rapidjson::Value(opt.second.c_str(), json.GetAllocator()),
json.GetAllocator());
}
}
json.AddMember("option", option_json, json.GetAllocator());
if (enable.has_value()) {
json.AddMember("enable", enable.value(), json.GetAllocator());
}
auto dumped = rapidjson_stringify(json);
VLOG(10) << "dump: " << dumped;
return dumped;
}
CreateJobMetaRequest CreateJobMetaRequest::NewRunning(
const GraphId& graph_id, int32_t process_id, const std::string& log_path,
const std::string& type) {
CreateJobMetaRequest request;
request.graph_id = graph_id;
request.process_id = process_id;
request.start_time = GetCurrentTimeStamp();
request.status = JobStatus::kRunning;
request.log_path = log_path;
request.type = type;
return request;
}
std::string CreateJobMetaRequest::ToString() const {
rapidjson::Document json(rapidjson::kObjectType);
json.AddMember("detail", rapidjson::kObjectType, json.GetAllocator());
json["detail"].AddMember("graph_id", graph_id, json.GetAllocator());
json["detail"].AddMember("process_id", process_id, json.GetAllocator());
json.AddMember("start_time", start_time, json.GetAllocator());
json.AddMember("status", std::to_string(status), json.GetAllocator());
json.AddMember("log_path", log_path, json.GetAllocator());
json.AddMember("type", type, json.GetAllocator());
return rapidjson_stringify(json);
}
UpdateJobMetaRequest UpdateJobMetaRequest::NewCancel() {
UpdateJobMetaRequest request;
request.status = JobStatus::kCancelled;
request.end_time = GetCurrentTimeStamp();
return request;
}
UpdateJobMetaRequest UpdateJobMetaRequest::NewFinished(int rc) {
UpdateJobMetaRequest request;
if (rc == 0) {
request.status = JobStatus::kSuccess;
} else {
request.status = JobStatus::kFailed;
}
request.end_time = GetCurrentTimeStamp();
return request;
}
JobStatus parseFromString(const std::string& status_string) {
if (status_string == "RUNNING") {
return JobStatus::kRunning;
} else if (status_string == "SUCCESS") {
return JobStatus::kSuccess;
} else if (status_string == "FAILED") {
return JobStatus::kFailed;
} else if (status_string == "CANCELLED") {
return JobStatus::kCancelled;
} else {
LOG(ERROR) << "Unknown job status: " << status_string;
return JobStatus::kUnknown;
}
}
std::string GraphStatistics::ToJson() const {
rapidjson::Document json(rapidjson::kObjectType);
json.AddMember("total_vertex_count", total_vertex_count, json.GetAllocator());
json.AddMember("total_edge_count", total_edge_count, json.GetAllocator());
json.AddMember("vertex_type_statistics", rapidjson::kArrayType,
json.GetAllocator());
for (auto& type_stat : vertex_type_statistics) {
rapidjson::Document type_stat_json(rapidjson::kObjectType,
&json.GetAllocator());
type_stat_json.AddMember("type_id", std::get<0>(type_stat),
json.GetAllocator());
type_stat_json.AddMember("type_name", std::get<1>(type_stat),
json.GetAllocator());
type_stat_json.AddMember("count", std::get<2>(type_stat),
json.GetAllocator());
json["vertex_type_statistics"].PushBack(type_stat_json,
json.GetAllocator());
}
json.AddMember("edge_type_statistics", rapidjson::kArrayType,
json.GetAllocator());
for (auto& type_stat : edge_type_statistics) {
rapidjson::Document type_stat_json(rapidjson::kObjectType,
&json.GetAllocator());
type_stat_json.AddMember("type_id", std::get<0>(type_stat),
json.GetAllocator());
type_stat_json.AddMember("type_name", std::get<1>(type_stat),
json.GetAllocator());
type_stat_json.AddMember("vertex_type_pair_statistics",
rapidjson::kArrayType, json.GetAllocator());
for (auto& pair_stat : std::get<2>(type_stat)) {
rapidjson::Document pair_stat_json(rapidjson::kObjectType,
&json.GetAllocator());
pair_stat_json.AddMember("source_vertex", std::get<0>(pair_stat),
json.GetAllocator());
pair_stat_json.AddMember("destination_vertex", std::get<1>(pair_stat),
json.GetAllocator());
pair_stat_json.AddMember("count", std::get<2>(pair_stat),
json.GetAllocator());
type_stat_json["vertex_type_pair_statistics"].PushBack(
pair_stat_json, json.GetAllocator());
}
json["edge_type_statistics"].PushBack(type_stat_json, json.GetAllocator());
}
return rapidjson_stringify(json);
}
Result<GraphStatistics> GraphStatistics::FromJson(const std::string& json_str) {
rapidjson::Document json(rapidjson::kObjectType);
if (json.Parse(json_str.c_str()).HasParseError()) {
LOG(ERROR) << "Invalid json string: " << json_str;
return Result<GraphStatistics>(Status(
StatusCode::INTERNAL_ERROR,
"Invalid json string when parsing graph statistics : " + json_str));
}
return GraphStatistics::FromJson(json);
}
Result<GraphStatistics> GraphStatistics::FromJson(
const rapidjson::Value& json) {
GraphStatistics stat;
stat.total_vertex_count = json["total_vertex_count"].GetInt64();
stat.total_edge_count = json["total_edge_count"].GetInt64();
for (auto& type_stat : json["vertex_type_statistics"].GetArray()) {
stat.vertex_type_statistics.push_back({type_stat["type_id"].GetInt(),
type_stat["type_name"].GetString(),
type_stat["count"].GetInt64()});
}
for (auto& type_stat : json["edge_type_statistics"].GetArray()) {
std::vector<typename GraphStatistics::vertex_type_pair_statistic>
vertex_type_pair_statistics;
for (auto& pair : type_stat["vertex_type_pair_statistics"].GetArray()) {
vertex_type_pair_statistics.push_back(
{pair["source_vertex"].GetString(),
pair["destination_vertex"].GetString(), pair["count"].GetInt64()});
}
stat.edge_type_statistics.push_back({type_stat["type_id"].GetInt(),
type_stat["type_name"].GetString(),
vertex_type_pair_statistics});
}
return stat;
}
} // namespace gs
namespace std {
std::string to_string(const gs::JobStatus& status) {
switch (status) {
case gs::JobStatus::kRunning:
return "RUNNING";
case gs::JobStatus::kSuccess:
return "SUCCESS";
case gs::JobStatus::kFailed:
return "FAILED";
case gs::JobStatus::kCancelled:
return "CANCELLED";
case gs::JobStatus::kUnknown:
return "UNKNOWN";
}
return "UNKNOWN";
}
std::ostream& operator<<(std::ostream& os, const gs::JobStatus& status) {
os << to_string(status);
return os;
}
} // namespace std