flex/engines/http_server/workdir_manipulator.cc (1,141 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "flex/engines/http_server/workdir_manipulator.h" #include <rapidjson/document.h> #include "flex/engines/http_server/codegen_proxy.h" #include "service_utils.h" #include <boost/uuid/uuid.hpp> // uuid class #include <boost/uuid/uuid_generators.hpp> // generators #include <boost/uuid/uuid_io.hpp> // streaming operators etc. // Write a macro to define the function, to check whether a filed presents in a // json object. #define CHECK_JSON_FIELD(json, field) \ if (!json.HasMember(field)) { \ return gs::Result<seastar::sstring>( \ gs::Status(gs::StatusCode::INVALID_ARGUMENT, \ "Procedure " + std::string(field) + " is not specified")); \ } namespace server { std::string WorkDirManipulator::workspace = "."; // default to . void WorkDirManipulator::SetWorkspace(const std::string& path) { workspace = path; } std::string WorkDirManipulator::GetWorkspace() { return workspace; } gs::Result<seastar::sstring> WorkDirManipulator::DumpGraphSchema( const gs::GraphId& graph_id, const std::string& json_str) { YAML::Node yaml_node; try { yaml_node = YAML::Load(json_str); } catch (const std::exception& e) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INVALID_SCHEMA, "Fail to parse graph schema: " + json_str + ", error: " + e.what())); } return DumpGraphSchema(graph_id, yaml_node); } // GraphName can be specified in the config file or in the argument. gs::Result<seastar::sstring> WorkDirManipulator::DumpGraphSchema( const gs::GraphId& graph_id, const YAML::Node& yaml_config) { // First check graph exits if (!yaml_config["name"]) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INVALID_SCHEMA, "Graph name is not specified"), seastar::sstring("Graph name is not specified")); } if (is_graph_exist(graph_id)) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::ALREADY_EXISTS, "Graph already exists"), seastar::sstring("graph " + graph_id + " already exists")); } // First check whether yaml is valid auto schema_result = gs::Schema::LoadFromYamlNode(yaml_config); if (!schema_result.ok()) { return gs::Result<seastar::sstring>( seastar::sstring(schema_result.status().error_message())); } auto& schema = schema_result.value(); // dump schema to file. auto dump_res = dump_graph_schema(yaml_config, graph_id); if (!dump_res.ok()) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::PERMISSION_DENIED, "Fail to dump graph schema: " + dump_res.status().error_message())); } VLOG(10) << "Successfully dump graph schema to file: " << graph_id << ", " << GetGraphSchemaPath(graph_id); return gs::Result<seastar::sstring>( seastar::sstring("successfully created graph ")); } gs::Result<bool> WorkDirManipulator::DumpGraphSchema( const gs::GraphMeta& graph_meta, const std::vector<gs::PluginMeta>& plugin_metas) { auto graph_id = graph_meta.id; if (!is_graph_exist(graph_id)) { return gs::Result<bool>( gs::Status(gs::StatusCode::NOT_FOUND, "Graph not exists: " + graph_id), false); } auto graph_schema = graph_meta.ToJson(); YAML::Node yaml_node; try { yaml_node = YAML::Load(graph_schema); } catch (const std::exception& e) { return gs::Result<bool>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to parse graph schema: " + graph_schema + ", error: " + e.what()), false); } if (!yaml_node["stored_procedures"]) { yaml_node["stored_procedures"] = YAML::Node(YAML::NodeType::Sequence); } if (!yaml_node["version"]) { yaml_node["version"] = "v0.1"; } auto procedures_node = yaml_node["stored_procedures"]; for (auto& plugin : plugin_metas) { if (plugin.enable) { // push back to sequence YAML::Node plugin_node; plugin_node["name"] = plugin.name; plugin_node["library"] = plugin.library; // quote the description, since it may contain space. plugin_node["description"] = "\"" + plugin.description + "\""; if (plugin.params.size() > 0) { YAML::Node params_node; for (auto& param : plugin.params) { params_node.push_back(YAML::convert<gs::Parameter>::encode( param)); // convert to YAML::Node via encode function } plugin_node["params"] = params_node; } if (plugin.returns.size() > 0) { YAML::Node returns_node; for (auto& ret : plugin.returns) { returns_node.push_back(YAML::convert<gs::Parameter>::encode( ret)); // convert to YAML::Node via encode function } plugin_node["returns"] = returns_node; } procedures_node.push_back(plugin_node); VLOG(10) << "Add enabled plugin: " << plugin.name; } else { VLOG(10) << "Plugin is not enabled: " << plugin.name; } } yaml_node["stored_procedures"] = procedures_node; auto dump_res = dump_graph_schema(yaml_node, graph_id); if (!dump_res.ok()) { return gs::Result<bool>(gs::Status(gs::StatusCode::PERMISSION_DENIED, "Fail to dump graph schema: " + dump_res.status().error_message()), false); } VLOG(10) << "Successfully dump graph schema to file: " << graph_id << ", " << GetGraphSchemaPath(graph_id); return gs::Result<bool>(true); } gs::Result<seastar::sstring> WorkDirManipulator::GetGraphSchemaString( const std::string& graph_name) { if (!is_graph_exist(graph_name)) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::NOT_FOUND, "Graph not exists: " + graph_name), seastar::sstring()); } auto schema_file = GetGraphSchemaPath(graph_name); if (!std::filesystem::exists(schema_file)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph schema file is expected, but not exists: " + schema_file)); } // read schema file and output to string auto schema_str_res = gs::get_json_string_from_yaml(schema_file); if (!schema_str_res.ok()) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::NOT_FOUND, "Failed to read schema file: " + schema_file + ", error: " + schema_str_res.status().error_message())); } else { return gs::Result<seastar::sstring>(schema_str_res.value()); } } gs::Result<gs::Schema> WorkDirManipulator::GetGraphSchema( const std::string& graph_name) { LOG(INFO) << "Get graph schema: " << graph_name; gs::Schema schema; if (!is_graph_exist(graph_name)) { return gs::Result<gs::Schema>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph not exists: " + graph_name)); } auto schema_file = GetGraphSchemaPath(graph_name); if (!std::filesystem::exists(schema_file)) { return gs::Result<gs::Schema>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph schema file is expected, but not exists: " + schema_file)); } // Load schema from schema_file try { LOG(INFO) << "Load graph schema from file: " << schema_file; auto schema_res = gs::Schema::LoadFromYaml(schema_file); if (!schema_res.ok()) { return gs::Result<gs::Schema>(schema_res.status(), schema); } schema = schema_res.value(); } catch (const std::exception& e) { LOG(ERROR) << "Fail to load graph schema: " << schema_file << ", error: " << e.what(); return gs::Result<gs::Schema>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to load graph schema: " + schema_file + ", for graph: " + graph_name + e.what())); } return gs::Result<gs::Schema>(schema); } gs::Result<seastar::sstring> WorkDirManipulator::GetDataDirectory( const std::string& graph_name) { if (!is_graph_exist(graph_name)) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::NOT_FOUND, "Graph not exists: " + graph_name), seastar::sstring()); } auto data_dir = GetGraphIndicesDir(graph_name); if (!std::filesystem::exists(data_dir)) { std::filesystem::create_directory(data_dir); } return gs::Result<seastar::sstring>(data_dir); } gs::Result<seastar::sstring> WorkDirManipulator::ListGraphs() { // list all graph schema files under data_workspace YAML::Node yaml_list; auto data_workspace = workspace + "/" + DATA_DIR_NAME; for (const auto& entry : std::filesystem::directory_iterator(data_workspace)) { if (entry.is_directory()) { auto graph_name = entry.path().filename().string(); // visit graph.yaml under data/graph_name/graph.yaml auto graph_path = GetGraphSchemaPath(graph_name); VLOG(10) << "Check graph path: " << graph_path; if (!std::filesystem::exists(graph_path)) { continue; } try { auto graph_schema_str_res = YAML::LoadFile(graph_path); yaml_list.push_back(graph_schema_str_res); } catch (const std::exception& e) { LOG(ERROR) << "Fail to parse graph schema file: " << graph_path << ", error: " << e.what(); } } } auto json_str = gs::get_json_string_from_yaml(yaml_list); if (!json_str.ok()) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to convert yaml to json: " + json_str.status().error_message())); } return gs::Result<seastar::sstring>(json_str.value()); } gs::Result<seastar::sstring> WorkDirManipulator::DeleteGraph( const std::string& graph_name) { // remove the graph directory try { auto graph_path = get_graph_dir(graph_name); std::filesystem::remove_all(graph_path); } catch (const std::exception& e) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to remove graph directory: " + graph_name), seastar::sstring("Fail to remove graph directory: " + graph_name)); } return gs::Result<seastar::sstring>( gs::Status::OK(), "Successfully delete graph: " + graph_name); } gs::Result<seastar::sstring> WorkDirManipulator::LoadGraph( const std::string& graph_name, const YAML::Node& yaml_node, int32_t loading_thread_num, const std::string& dst_indices_dir, std::shared_ptr<gs::IGraphMetaStore> metadata_store) { // First check whether graph exists if (!is_graph_exist(graph_name)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph not exists: " + graph_name)); } // No need to check whether graph exists, because it is checked in LoadGraph // First load schema auto schema_file = GetGraphSchemaPath(graph_name); gs::Schema schema; try { auto schema_res = gs::Schema::LoadFromYaml(schema_file); if (!schema_res.ok()) { return gs::Result<seastar::sstring>(schema_res.status()); } schema = schema_res.value(); } catch (const std::exception& e) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to load graph schema: " + schema_file + ", for graph: " + graph_name)); } VLOG(1) << "Loaded schema, vertex label num: " << schema.vertex_label_num() << ", edge label num: " << schema.edge_label_num(); auto loading_config_res = gs::LoadingConfig::ParseFromYamlNode(schema, yaml_node); if (!loading_config_res.ok()) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, loading_config_res.status().error_message())); } // dump to file auto loading_config = loading_config_res.value(); std::string temp_file_name = graph_name + "_bulk_loading_config.yaml"; auto temp_file_path = TMP_DIR + "/" + temp_file_name; RETURN_IF_NOT_OK(dump_yaml_to_file(yaml_node, temp_file_path)); auto loading_config_json_str_res = gs::get_json_string_from_yaml(yaml_node); if (!loading_config_json_str_res.ok()) { return loading_config_json_str_res.status(); } return load_graph_impl(temp_file_path, graph_name, loading_thread_num, dst_indices_dir, loading_config_json_str_res.value(), metadata_store); } gs::Result<seastar::sstring> WorkDirManipulator::GetProceduresByGraphName( const std::string& graph_name) { if (!is_graph_exist(graph_name)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph not exists: " + graph_name)); } // get graph schema file, and get procedure lists. auto schema_file = GetGraphSchemaPath(graph_name); if (!std::filesystem::exists(schema_file)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph schema file is expected, but not exists: " + schema_file)); } YAML::Node schema_node; try { schema_node = YAML::LoadFile(schema_file); } catch (const std::exception& e) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to load graph schema: " + schema_file + ", error: " + e.what())); } if (schema_node["stored_procedures"]) { auto procedure_node = schema_node["stored_procedures"]; if (procedure_node["enable_lists"]) { auto procedures = procedure_node["enable_lists"]; if (procedures.IsSequence()) { std::vector<std::string> procedure_list; for (const auto& procedure : procedures) { procedure_list.push_back(procedure.as<std::string>()); } LOG(INFO) << "Enabled procedures found: " << graph_name << ", schema file: " << schema_file << ", procedure list: " << gs::to_string(procedure_list); return get_all_procedure_yamls(graph_name, procedure_list); } } } LOG(INFO) << "No enabled procedures found: " << graph_name << ", schema file: " << schema_file; return get_all_procedure_yamls( graph_name); // should be all procedures, not enabled only. } gs::Result<seastar::sstring> WorkDirManipulator::GetProcedureByGraphAndProcedureName( const std::string& graph_id, const std::string& procedure_id) { if (!is_graph_exist(graph_id)) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::NOT_FOUND, "Graph not exists: " + graph_id)); } // get graph schema file, and get procedure lists. auto schema_file = GetGraphSchemaPath(graph_id); if (!std::filesystem::exists(schema_file)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph schema file is expected, but not exists: " + schema_file)); } YAML::Node schema_node; try { schema_node = YAML::LoadFile(schema_file); } catch (const std::exception& e) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to load graph schema: " + schema_file + ", error: " + e.what())); } // get yaml file in plugin directory. auto plugin_dir = GetGraphPluginDir(graph_id); if (!std::filesystem::exists(plugin_dir)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph plugin directory is expected, but not exists: " + plugin_dir)); } auto plugin_file = plugin_dir + "/" + procedure_id + ".yaml"; if (!std::filesystem::exists(plugin_file)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "plugin not found " + plugin_file)); } // check whether procedure is enabled. YAML::Node plugin_node; try { plugin_node = YAML::LoadFile(plugin_file); } catch (const std::exception& e) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to load graph plugin: " + plugin_file + ", error: " + e.what())); } return gs::Result<seastar::sstring>( gs::get_json_string_from_yaml(plugin_node).value()); } seastar::future<seastar::sstring> WorkDirManipulator::CreateProcedure( const std::string& graph_name, const std::string& plugin_id, const rapidjson::Value& json, const std::string& engine_config_path) { LOG(INFO) << "Create procedure: " << plugin_id << " on graph: " << graph_name; if (!is_graph_exist(graph_name)) { return seastar::make_ready_future<seastar::sstring>("Graph not exists: " + graph_name); } // check procedure exits auto plugin_dir = GetGraphPluginDir(graph_name); if (!std::filesystem::exists(plugin_dir)) { try { std::filesystem::create_directory(plugin_dir); } catch (const std::exception& e) { return seastar::make_ready_future<seastar::sstring>( "Fail to create plugin directory: " + plugin_dir); } } // load parameter as json, and do some check // check required fields is give. auto res = create_procedure_sanity_check(json); if (!res.ok()) { return seastar::make_exception_future<seastar::sstring>( res.status().error_message()); } LOG(INFO) << "Pass sanity check for procedure: " << json["name"].GetString(); // get procedure name // check whether procedure already exists. auto plugin_file = plugin_dir + "/" + plugin_id + ".yaml"; if (std::filesystem::exists(plugin_file)) { return seastar::make_exception_future<seastar::sstring>( "Procedure already exists: " + plugin_id); } return generate_procedure(graph_name, plugin_id, json, engine_config_path); } gs::Result<seastar::sstring> WorkDirManipulator::DeleteProcedure( const std::string& graph_name, const std::string& procedure_name) { LOG(INFO) << "Delete procedure: " << procedure_name << " on graph: " << graph_name; if (!is_graph_exist(graph_name)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph not exists: " + graph_name)); } // remove the plugin file and dynamic lib auto plugin_dir = GetGraphPluginDir(graph_name); if (!std::filesystem::exists(plugin_dir)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph plugin directory is expected, but not exists: " + plugin_dir)); } auto plugin_file = plugin_dir + "/" + procedure_name + ".yaml"; if (!std::filesystem::exists(plugin_file)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "plugin not found " + plugin_file)); } try { std::filesystem::remove(plugin_file); } catch (const std::exception& e) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to remove plugin file: " + plugin_file + ", error: " + e.what())); } auto plugin_lib = plugin_dir + "/lib" + procedure_name + ".so"; if (!std::filesystem::exists(plugin_lib)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "plugin lib not found " + plugin_lib)); } try { std::filesystem::remove(plugin_lib); } catch (const std::exception& e) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to remove plugin lib: " + plugin_lib + ", error: " + e.what())); } return gs::Result<seastar::sstring>(gs::Status::OK(), "Successfully delete procedure"); } // we only support update the description and enable status. gs::Result<seastar::sstring> WorkDirManipulator::UpdateProcedure( const std::string& graph_name, const std::string& procedure_name, const std::string& parameters) { // check procedure exits. auto plugin_dir = GetGraphPluginDir(graph_name); if (!std::filesystem::exists(plugin_dir)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph plugin directory is expected, but not exists: " + plugin_dir)); } auto plugin_file = plugin_dir + "/" + procedure_name + ".yaml"; if (!std::filesystem::exists(plugin_file)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "plugin not found " + plugin_file)); } // load parameter as json, and do some check rapidjson::Document json; if (json.Parse(parameters.c_str()).HasParseError()) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to parse parameter as json: " + parameters)); } VLOG(1) << "Successfully parse json parameters: " << gs::rapidjson_stringify(json); // load plugin_file as yaml YAML::Node plugin_node; try { plugin_node = YAML::LoadFile(plugin_file); } catch (const std::exception& e) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to load graph plugin: " + plugin_file + ", error: " + e.what())); } // update description and enable status. if (json.HasMember("description")) { auto& new_description = json["description"]; VLOG(10) << "Update description: " << gs::jsonToString(new_description); // update description // quote the description, since it may contain space. plugin_node["description"] = "\"" + std::string(new_description.GetString()) + "\""; } bool enabled; if (json.HasMember("enable")) { VLOG(1) << "Enable is specified in the parameter:" << gs::jsonToString(json["enable"]); if (json["enable"].IsBool()) { enabled = json["enable"].GetBool(); } else if (json["enable"].IsString()) { std::string enable_str = json["enable"].GetString(); if (enable_str == "true" || enable_str == "True" || enable_str == "TRUE") { enabled = true; } else { enabled = false; } } else { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to parse enable field: " + gs::jsonToString(json["enable"]))); } plugin_node["enable"] = enabled; } // dump to file. auto dump_res = dump_yaml_to_file(plugin_node, plugin_file); if (!dump_res.ok()) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to dump plugin yaml to file: " + plugin_file + ", error: " + dump_res.status().error_message())); } VLOG(10) << "Dump plugin yaml to file: " << plugin_file; // if enable is specified in the parameter, update graph schema file. if (enabled) { return enable_procedure_on_graph(graph_name, procedure_name); } else { return disable_procedure_on_graph(graph_name, procedure_name); } } gs::Result<seastar::sstring> WorkDirManipulator::GetProcedureLibPath( const std::string& graph_name, const std::string& procedure_name) { if (!is_graph_exist(graph_name)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph not exists: " + graph_name)); } // get the plugin dir and append procedure_name auto plugin_dir = GetGraphPluginDir(graph_name); if (!std::filesystem::exists(plugin_dir)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph plugin directory is expected, but not exists: " + plugin_dir)); } auto plugin_so_path = plugin_dir + "/lib" + procedure_name + ".so"; if (!std::filesystem::exists(plugin_so_path)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph plugin so file is expected, but not exists: " + plugin_so_path)); } return gs::Result<seastar::sstring>(plugin_so_path); } std::string WorkDirManipulator::GetGraphSchemaPath( const std::string& graph_name) { return get_graph_dir(graph_name) + "/" + GRAPH_SCHEMA_FILE_NAME; } std::string WorkDirManipulator::GetGraphDir(const std::string& graph_name) { return get_graph_dir(graph_name); } std::string WorkDirManipulator::get_graph_lock_file( const std::string& graph_name) { return get_graph_dir(graph_name) + "/" + LOCK_FILE; } std::string WorkDirManipulator::GetGraphIndicesDir( const std::string& graph_name) { return get_graph_dir(graph_name) + "/" + GRAPH_INDICES_DIR_NAME; } std::string WorkDirManipulator::GetLogDir() { auto log_dir = workspace + "/logs/"; if (!std::filesystem::exists(log_dir)) { std::filesystem::create_directory(log_dir); } return log_dir; } std::string WorkDirManipulator::GetUploadDir() { auto upload_dir = workspace + "/" + UPLOAD_DIR; if (!std::filesystem::exists(upload_dir)) { std::filesystem::create_directory(upload_dir); } return upload_dir; } std::string WorkDirManipulator::GetCompilerLogFile() { // with timestamp auto time_stamp = std::to_string( std::chrono::system_clock::now().time_since_epoch().count()); auto log_path = GetLogDir() + "/compiler.log"; // Check if the log file exists if (std::filesystem::exists(log_path)) { // Backup the previous log file std::string backupPath = GetLogDir() + "/compiler.log." + time_stamp; std::filesystem::rename(log_path, backupPath); std::cout << "Backed up the previous log file to: " << backupPath << std::endl; } return log_path; } gs::Result<std::string> WorkDirManipulator::CommitTempIndices( const std::string& graph_id) { auto temp_indices_dir = GetTempIndicesDir(graph_id); auto indices_dir = GetGraphIndicesDir(graph_id); if (std::filesystem::exists(indices_dir)) { std::filesystem::remove_all(indices_dir); } if (!std::filesystem::exists(temp_indices_dir)) { return { gs::Status(gs::StatusCode::NOT_FOUND, "Temp indices dir not found")}; } std::filesystem::rename(temp_indices_dir, indices_dir); return indices_dir; } gs::Result<std::string> WorkDirManipulator::CreateFile( const seastar::sstring& raw_file_name, const seastar::sstring& content) { if (content.size() == 0) { return {gs::Status(gs::StatusCode::INVALID_ARGUMENT, "Content is empty")}; } std::string file_name = GetUploadDir() + "/" + raw_file_name.c_str(); // get the timestamp as the file name std::ofstream fout(file_name); if (!fout.is_open()) { return {gs::Status(gs::StatusCode::PERMISSION_DENIED, "Fail to open file")}; } fout << content; fout.close(); LOG(INFO) << "Successfully create file: " << file_name; return gs::Result<std::string>(file_name); } // graph_name can be a path, first try as it is absolute path, or // relative path std::string WorkDirManipulator::get_graph_dir(const std::string& graph_name) { if (std::filesystem::exists(graph_name)) { return graph_name; } return workspace + "/" + DATA_DIR_NAME + "/" + graph_name; } bool WorkDirManipulator::is_graph_exist(const std::string& graph_name) { auto graph_path = GetGraphSchemaPath(graph_name); return std::filesystem::exists(graph_path); } std::string WorkDirManipulator::GetTempIndicesDir( const std::string& graph_name) { return get_graph_dir(graph_name) + "/" + GRAPH_TEMP_INDICES_DIR_NAME; } std::string WorkDirManipulator::CleanTempIndicesDir( const std::string& graph_name) { auto temp_indices_dir = GetTempIndicesDir(graph_name); if (std::filesystem::exists(temp_indices_dir)) { std::filesystem::remove_all(temp_indices_dir); } return temp_indices_dir; } std::string WorkDirManipulator::get_graph_indices_file( const std::string& graph_name) { return get_graph_dir(graph_name) + GRAPH_INDICES_DIR_NAME + "/" + GRAPH_INDICES_FILE_NAME; } std::string WorkDirManipulator::GetGraphPluginDir( const std::string& graph_name) { return get_graph_dir(graph_name) + "/" + GRAPH_PLUGIN_DIR_NAME; } bool WorkDirManipulator::ensure_graph_dir_exists( const std::string& graph_name) { auto graph_path = get_graph_dir(graph_name); if (!std::filesystem::exists(graph_path)) { std::filesystem::create_directory(graph_path); } return std::filesystem::exists(graph_path); } gs::Result<std::string> WorkDirManipulator::dump_graph_schema( const YAML::Node& yaml_config, const std::string& graph_name) { if (!ensure_graph_dir_exists(graph_name)) { return {gs::Status(gs::StatusCode::PERMISSION_DENIED, "Fail to create graph directory")}; } auto graph_path = GetGraphSchemaPath(graph_name); VLOG(10) << "Dump graph schema to file: " << graph_path; std::ofstream fout(graph_path); if (!fout.is_open()) { return {gs::Status(gs::StatusCode::PERMISSION_DENIED, "Fail to open file")}; } std::string yaml_str; ASSIGN_AND_RETURN_IF_RESULT_NOT_OK( yaml_str, gs::get_yaml_string_from_yaml_node(yaml_config)); fout << yaml_str; fout.close(); VLOG(10) << "Successfully dump graph schema to file: " << graph_path; return gs::Result<std::string>(gs::Status::OK()); } std::string WorkDirManipulator::get_tmp_bulk_loading_job_log_path( const std::string& graph_name) { // file_name = graph_name + current_time + ".log"; auto current_time = std::chrono::system_clock::now(); auto current_time_str = std::chrono::duration_cast<std::chrono::milliseconds>( current_time.time_since_epoch()) .count(); auto file_name = TMP_DIR + "/" + graph_name + "_" + std::to_string(current_time_str) + ".log"; return file_name; } gs::Result<seastar::sstring> WorkDirManipulator::load_graph_impl( const std::string& config_file_path, const std::string& graph_id, int32_t loading_thread_num, const std::string& dst_indices_dir, const std::string& loading_config_json_str, std::shared_ptr<gs::IGraphMetaStore> metadata_store) { auto schema_file = GetGraphSchemaPath(graph_id); auto final_indices_dir = GetGraphIndicesDir(graph_id); auto bulk_loading_job_log = get_tmp_bulk_loading_job_log_path(graph_id); VLOG(10) << "Bulk loading job log: " << bulk_loading_job_log; std::stringstream ss; std::string graph_loader_bin; ASSIGN_AND_RETURN_IF_RESULT_NOT_OK(graph_loader_bin, GetGraphLoaderBin()); ss << graph_loader_bin << " -g " << schema_file << " -l " << config_file_path << " -d " << dst_indices_dir << " -p " << std::to_string(loading_thread_num); auto cmd_string = ss.str(); VLOG(10) << "Call graph_loader: " << cmd_string; gs::JobId job_id; auto fut = hiactor::thread_resource_pool::submit_work( [&job_id, copied_graph_id = graph_id, cmd_string_copied = cmd_string, tmp_indices_dir_copied = dst_indices_dir, final_indices_dir_copied = final_indices_dir, bulk_loading_job_log_copied = bulk_loading_job_log, loading_config_json_str_copied = loading_config_json_str, metadata_store = metadata_store]() mutable { boost::process::child child_handle( cmd_string_copied, boost::process::std_out > bulk_loading_job_log_copied, boost::process::std_err > bulk_loading_job_log_copied); int32_t pid = child_handle.id(); auto create_job_req = gs::CreateJobMetaRequest::NewRunning( copied_graph_id, pid, bulk_loading_job_log_copied, "BULK_LOADING"); auto create_job_res = metadata_store->CreateJobMeta(create_job_req); if (!create_job_res.ok()) { LOG(ERROR) << "Fail to create job meta for graph: " << copied_graph_id; return gs::Result<seastar::sstring>(create_job_res.status()); } job_id = create_job_res.value(); LOG(INFO) << "Successfully created job: " << job_id; auto internal_job_id = job_id; LOG(INFO) << "Waiting exiting..."; child_handle.wait(); auto res = child_handle.exit_code(); VLOG(10) << "Graph loader finished, job_id: " << internal_job_id << ", res: " << res; LOG(INFO) << "Updating job meta and graph meta"; auto exit_request = gs::UpdateJobMetaRequest::NewFinished(res); auto update_exit_res = metadata_store->UpdateJobMeta(internal_job_id, exit_request); if (!update_exit_res.ok()) { LOG(ERROR) << "Fail to update job status to finished, job_id: " << internal_job_id; return gs::Result<seastar::sstring>(update_exit_res.status()); } gs::UpdateGraphMetaRequest update_graph_meta_req( gs::GetCurrentTimeStamp(), loading_config_json_str_copied); // Note that this call is also transactional auto update_graph_meta_res = metadata_store->UpdateGraphMeta( copied_graph_id, update_graph_meta_req); if (!update_graph_meta_res.ok()) { LOG(INFO) << "Fail to update graph meta for graph: " << copied_graph_id; WorkDirManipulator::CleanTempIndicesDir(copied_graph_id); return gs::Result<seastar::sstring>( update_graph_meta_res.status()); } LOG(INFO) << "Committing temp indices for graph: " << copied_graph_id; auto commit_res = WorkDirManipulator::CommitTempIndices(copied_graph_id); if (!commit_res.ok()) { LOG(ERROR) << "Fail to commit temp indices for graph: " << copied_graph_id; return gs::Result<seastar::sstring>(commit_res.status()); } return gs::Result<seastar::sstring>( "Finish Loading and commit temp " "indices"); }) .then_wrapped([copied_graph_id = graph_id, metadata_store = metadata_store](auto&& f) { // the destructor of lock_file will unlock the graph. // the destructor of decrementer will decrement the job count. metadata_store->UnlockGraphIndices(copied_graph_id); return gs::Result<seastar::sstring>("Finish unlock graph"); }); while (job_id.empty()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } LOG(INFO) << "Successfully created job: " << job_id; return gs::Result<seastar::sstring>(job_id); } gs::Result<seastar::sstring> WorkDirManipulator::create_procedure_sanity_check( const rapidjson::Value& json) { // check required fields is give. CHECK_JSON_FIELD(json, "bound_graph"); CHECK_JSON_FIELD(json, "description"); CHECK_JSON_FIELD(json, "enable"); CHECK_JSON_FIELD(json, "name"); CHECK_JSON_FIELD(json, "query"); CHECK_JSON_FIELD(json, "type"); std::string type = json["type"].GetString(); if (type == "cypher" || type == "CYPHER") { LOG(INFO) << "Cypher procedure, name: " << json["name"].GetString() << ", enable: " << json["enable"].GetBool(); } else if (type == "CPP" || type == "cpp") { LOG(INFO) << "Native procedure, name: " << json["name"].GetString() << ", enable: " << json["enable"].GetBool(); } else { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INVALID_ARGUMENT, "Procedure type is not supported: " + type)); } return gs::Result<seastar::sstring>(gs::Status::OK()); } seastar::future<seastar::sstring> WorkDirManipulator::generate_procedure( const std::string& graph_id, const std::string& plugin_id, const rapidjson::Value& json, const std::string& engine_config_path) { VLOG(10) << "Generate procedure: " << gs::rapidjson_stringify(json); auto codegen_bin = gs::find_codegen_bin(); auto temp_codegen_directory = std::string(server::CodegenProxy::DEFAULT_CODEGEN_DIR); // mkdir -p temp_codegen_directory if (!std::filesystem::exists(temp_codegen_directory)) { std::filesystem::create_directory(temp_codegen_directory); } // dump json["query"] to file. auto query = json["query"].GetString(); // auto name = json["name"].GetString(); std::string type = json["type"].GetString(); std::string query_name = plugin_id; std::string procedure_desc; if (json.HasMember("description")) { procedure_desc = json["description"].GetString(); } else { procedure_desc = ""; } std::string query_file; if (type == "cypher" || type == "CYPHER") { query_file = temp_codegen_directory + "/" + plugin_id + ".cypher"; } else if (type == "CPP" || type == "cpp") { query_file = temp_codegen_directory + "/" + plugin_id + ".cc"; } else { return seastar::make_exception_future<seastar::sstring>( "Procedure type is not supported: " + type); } // dump query string as text to query_file try { std::ofstream fout(query_file); if (!fout.is_open()) { return seastar::make_exception_future<seastar::sstring>( std::runtime_error("Fail to open query file: " + query_file)); } fout << query; fout.close(); } catch (const std::exception& e) { return seastar::make_exception_future<seastar::sstring>( std::runtime_error("Fail to dump query to file: " + query_file + ", error: " + std::string(e.what()))); } if (!is_graph_exist(graph_id)) { return seastar::make_exception_future<seastar::sstring>( std::runtime_error("Graph not exists: " + graph_id)); } auto output_dir = GetGraphPluginDir(graph_id); if (!std::filesystem::exists(output_dir)) { std::filesystem::create_directory(output_dir); } auto schema_path = GetGraphSchemaPath(graph_id); return CodegenProxy::CallCodegenCmd( codegen_bin, query_file, query_name, temp_codegen_directory, output_dir, schema_path, engine_config_path, procedure_desc) .then_wrapped([plugin_id = plugin_id, output_dir](auto&& f) { try { auto res = f.get(); if (!res.ok()) { return seastar::make_exception_future<seastar::sstring>( std::runtime_error("Fail to generate procedure, error: " + res.status().error_message())); } std::string so_file; { std::stringstream ss; ss << output_dir << "/lib" << plugin_id << ".so"; so_file = ss.str(); } VLOG(10) << "Check so file: " << so_file; if (!std::filesystem::exists(so_file)) { return seastar::make_exception_future<seastar::sstring>( std::runtime_error( "Fail to generate procedure, so file not exists: " + so_file)); } std::string yaml_file; { std::stringstream ss; ss << output_dir << "/" << plugin_id << ".yaml"; yaml_file = ss.str(); } LOG(INFO) << "Check yaml file: " << yaml_file; if (!std::filesystem::exists(yaml_file)) { return seastar::make_exception_future<seastar::sstring>( std::runtime_error( "Fail to generate procedure, yaml file not exists: " + yaml_file)); } return seastar::make_ready_future<seastar::sstring>( seastar::sstring(plugin_id)); } catch (const std::exception& e) { LOG(ERROR) << "Fail to generate procedure, error: " << e.what(); return seastar::make_exception_future<seastar::sstring>( std::runtime_error("Fail to generate procedure, error: " + std::string(e.what()))); } catch (...) { LOG(ERROR) << "Fail to generate procedure, unknown error"; return seastar::make_exception_future<seastar::sstring>( std::runtime_error("Fail to generate procedure, unknown error")); } }); } gs::Result<seastar::sstring> WorkDirManipulator::get_all_procedure_yamls( const std::string& graph_name, const std::vector<std::string>& procedure_names) { YAML::Node yaml_list; auto plugin_dir = GetGraphPluginDir(graph_name); // iterate all .yamls in plugin_dir if (std::filesystem::exists(plugin_dir)) { for (const auto& entry : std::filesystem::directory_iterator(plugin_dir)) { if (entry.path().extension() == ".yaml") { auto procedure_yaml_file = entry.path().string(); try { auto procedure_yaml_node = YAML::LoadFile(procedure_yaml_file); procedure_yaml_node["enabled"] = false; if (!procedure_yaml_node["name"]) { LOG(ERROR) << "Procedure yaml file not contains name: " << procedure_yaml_file; return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Procedure yaml file not contains name: " + procedure_yaml_file)); } auto proc_name = procedure_yaml_node["name"].as<std::string>(); if (std::find(procedure_names.begin(), procedure_names.end(), proc_name) != procedure_names.end()) { // only add the procedure yaml file that is in procedure_names. procedure_yaml_node["enabled"] = true; } yaml_list.push_back(procedure_yaml_node); } catch (const std::exception& e) { LOG(ERROR) << "Fail to load procedure yaml file: " << procedure_yaml_file << ", error: " << e.what(); return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to load procedure yaml file: " + procedure_yaml_file + ", error: " + e.what())); } } } } // dump to json auto res = gs::get_json_string_from_yaml(yaml_list); if (!res.ok()) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to dump procedure yaml list to json, error: " + res.status().error_message())); } return gs::Result<seastar::sstring>(std::move(res.value())); } // get all procedures for graph, all set to disabled. gs::Result<seastar::sstring> WorkDirManipulator::get_all_procedure_yamls( const std::string& graph_name) { YAML::Node yaml_list; auto plugin_dir = GetGraphPluginDir(graph_name); // iterate all .yamls in plugin_dir if (std::filesystem::exists(plugin_dir)) { for (const auto& entry : std::filesystem::directory_iterator(plugin_dir)) { if (entry.path().extension() == ".yaml") { auto procedure_yaml_file = entry.path().string(); try { auto procedure_yaml_node = YAML::LoadFile(procedure_yaml_file); procedure_yaml_node["enabled"] = false; yaml_list.push_back(procedure_yaml_node); } catch (const std::exception& e) { LOG(ERROR) << "Fail to load procedure yaml file: " << procedure_yaml_file << ", error: " << e.what(); return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to load procedure yaml file: " + procedure_yaml_file + ", error: " + e.what())); } } } } // dump to json auto res = gs::get_json_string_from_yaml(yaml_list); if (!res.ok()) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to dump procedure yaml list to json, error: " + res.status().error_message())); } return gs::Result<seastar::sstring>(std::move(res.value())); } gs::Result<seastar::sstring> WorkDirManipulator::get_procedure_yaml( const std::string& graph_name, const std::string& procedure_name) { auto procedure_yaml_file = GetGraphPluginDir(graph_name) + "/" + procedure_name + ".yaml"; if (!std::filesystem::exists(procedure_yaml_file)) { LOG(ERROR) << "Procedure yaml file not exists: " << procedure_yaml_file; return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Procedure yaml file not exists: " + procedure_yaml_file)); } try { auto procedure_yaml_node = YAML::LoadFile(procedure_yaml_file); // dump to json YAML::Emitter emitter; emitter << procedure_yaml_node; auto str = emitter.c_str(); return gs::Result<seastar::sstring>(std::move(str)); } catch (const std::exception& e) { LOG(ERROR) << "Fail to load procedure yaml file: " << procedure_yaml_file << ", error: " << e.what(); return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to load procedure yaml file: " + procedure_yaml_file + ", error: " + e.what())); } return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Unknown error")); } gs::Result<seastar::sstring> WorkDirManipulator::enable_procedure_on_graph( const std::string& graph_name, const std::string& procedure_name) { LOG(INFO) << "Enabling procedure " << procedure_name << " on graph " << graph_name; auto schema_file = GetGraphSchemaPath(graph_name); if (!std::filesystem::exists(schema_file)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph schema file not exists: " + schema_file + ", graph: " + graph_name)); } YAML::Node schema_node; try { schema_node = YAML::LoadFile(schema_file); } catch (const std::exception& e) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to load graph schema: " + schema_file + ", error: " + e.what())); } if (!schema_node["stored_procedures"]) { schema_node["stored_procedures"] = YAML::Node(YAML::NodeType::Map); } auto stored_procedures = schema_node["stored_procedures"]; if (!stored_procedures["enable_lists"]) { stored_procedures["enable_lists"] = YAML::Node(YAML::NodeType::Sequence); } auto enable_lists = stored_procedures["enable_lists"]; // check whether procedure is already in the list, if so, then we raise // error: procedure already exists. for (const auto& item : enable_lists) { if (item.as<std::string>() == procedure_name) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::OK, "Procedure already exists in graph: " + graph_name)); } } enable_lists.push_back(procedure_name); // dump schema to file auto dump_res = dump_yaml_to_file(schema_node, schema_file); if (!dump_res.ok()) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to dump graph schema: " + schema_file + ", error: " + dump_res.status().error_message())); } return gs::Result<seastar::sstring>(gs::Status::OK(), "Success"); } gs::Result<seastar::sstring> WorkDirManipulator::disable_procedure_on_graph( const std::string& graph_name, const std::string& procedure_name) { LOG(INFO) << "Disabling procedure " << procedure_name << " on graph " << graph_name; auto schema_file = GetGraphSchemaPath(graph_name); if (!std::filesystem::exists(schema_file)) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "Graph schema file not exists: " + schema_file + ", graph: " + graph_name)); } YAML::Node schema_node; try { schema_node = YAML::LoadFile(schema_file); } catch (const std::exception& e) { return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to load graph schema: " + schema_file + ", error: " + e.what())); } if (!schema_node["stored_procedures"]) { schema_node["stored_procedures"] = YAML::Node(YAML::NodeType::Map); } auto stored_procedures = schema_node["stored_procedures"]; if (!stored_procedures["enable_lists"]) { stored_procedures["enable_lists"] = YAML::Node(YAML::NodeType::Sequence); } auto enable_lists = stored_procedures["enable_lists"]; // check whether procedure is already in the list, if so, then we raise // error: procedure already exists. // remove procedure from enable_lists auto new_enable_list = YAML::Node(YAML::NodeType::Sequence); for (auto iter = enable_lists.begin(); iter != enable_lists.end(); iter++) { if (iter->as<std::string>() == procedure_name) { LOG(INFO) << "Found procedure " << procedure_name << " in enable_lists"; break; } else { new_enable_list.push_back(*iter); } } LOG(INFO) << "after remove: " << enable_lists; stored_procedures["enable_lists"] = new_enable_list; schema_node["stored_procedures"] = stored_procedures; // dump schema to file auto dump_res = dump_yaml_to_file(schema_node, schema_file); if (!dump_res.ok()) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to dump graph schema: " + schema_file + ", error: " + dump_res.status().error_message())); } return gs::Result<seastar::sstring>(gs::Status::OK(), "Success"); } gs::Result<seastar::sstring> WorkDirManipulator::dump_yaml_to_file( const YAML::Node& yaml_node, const std::string& procedure_yaml_file) { try { YAML::Emitter emitter; emitter << yaml_node; auto str = emitter.c_str(); std::ofstream fout(procedure_yaml_file); if (!fout.is_open()) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to open file: " + procedure_yaml_file + ", error: " + std::string(std::strerror(errno)))); } fout << str; fout.close(); } catch (const std::exception& e) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to dump yaml to file: " + procedure_yaml_file + ", error: " + std::string(e.what()))); } catch (...) { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to dump yaml to file: " + procedure_yaml_file + ", unknown error")); } LOG(INFO) << "Successfully dump yaml to file: " << procedure_yaml_file; return gs::Result<seastar::sstring>(gs::Status::OK(), "Success"); } gs::Result<seastar::sstring> WorkDirManipulator::GetGraphLoaderBin() { // first via relative path std::string graph_loader_bin_path = (gs::get_current_binary_directory() / std::string(GRAPH_LOADER_BIN)) .string(); if (std::filesystem::exists(graph_loader_bin_path)) { return gs::Result<seastar::sstring>(graph_loader_bin_path); } // test whether executable std::string which_cmd = std::string("which ") + GRAPH_LOADER_BIN + " > /dev/null"; if (std::system(which_cmd.c_str()) == 0) { return gs::Result<seastar::sstring>(graph_loader_bin_path); } else { return gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to find graph loader binary: " + GRAPH_LOADER_BIN)); } } // define LOCK_FILE // define DATA_DIR_NAME const std::string WorkDirManipulator::LOCK_FILE = ".lock"; const std::string WorkDirManipulator::DATA_DIR_NAME = "data"; const std::string WorkDirManipulator::GRAPH_SCHEMA_FILE_NAME = "graph.yaml"; const std::string WorkDirManipulator::GRAPH_INDICES_FILE_NAME = "init_snapshot.bin"; const std::string WorkDirManipulator::GRAPH_INDICES_DIR_NAME = "indices"; const std::string WorkDirManipulator::GRAPH_TEMP_INDICES_DIR_NAME = "temp_indices"; const std::string WorkDirManipulator::GRAPH_PLUGIN_DIR_NAME = "plugins"; const std::string WorkDirManipulator::CONF_ENGINE_CONFIG_FILE_NAME = "interactive_config.yaml"; const std::string WorkDirManipulator::RUNNING_GRAPH_FILE_NAME = "RUNNING"; const std::string WorkDirManipulator::TMP_DIR = "/tmp"; const std::string WorkDirManipulator::GRAPH_LOADER_BIN = "bulk_loader"; const std::string WorkDirManipulator::UPLOAD_DIR = "upload"; } // namespace server