flex/engines/http_server/actor/admin_actor.act.cc (1,191 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <filesystem> #include "flex/engines/http_server/actor/admin_actor.act.h" #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/database/graph_db_session.h" #include "flex/engines/http_server/codegen_proxy.h" #include "flex/engines/http_server/graph_db_service.h" #include "flex/engines/http_server/workdir_manipulator.h" #include "flex/utils/service_utils.h" #include <rapidjson/document.h> #include <rapidjson/pointer.h> #include <rapidjson/rapidjson.h> #include <seastar/core/print.hh> namespace server { std::string to_message_json(const std::string& message) { return "{\"message\":\"" + message + "\"}"; } gs::GraphStatistics get_graph_statistics(const gs::GraphDBSession& sess) { gs::GraphStatistics stat; const auto& graph = sess.graph(); const auto& schema = sess.graph().schema(); auto vertex_label_num = graph.schema().vertex_label_num(); auto edge_label_num = graph.schema().edge_label_num(); for (auto i = 0; i < vertex_label_num; ++i) { stat.total_vertex_count += graph.vertex_num(i); stat.vertex_type_statistics.emplace_back( std::tuple{i, schema.get_vertex_label_name(i), graph.vertex_num(i)}); } for (auto edge_label_id = 0; edge_label_id < edge_label_num; ++edge_label_id) { auto edge_label_name = schema.get_edge_label_name(edge_label_id); std::vector<std::tuple<std::string, std::string, int32_t>> vertex_type_pair_statistics; for (auto src_label_id = 0; src_label_id < vertex_label_num; ++src_label_id) { auto src_label_name = schema.get_vertex_label_name(src_label_id); for (auto dst_label_id = 0; dst_label_id < vertex_label_num; ++dst_label_id) { auto dst_label_name = schema.get_vertex_label_name(dst_label_id); if (schema.exist(src_label_id, dst_label_id, edge_label_id)) { auto oe_csr = graph.get_oe_csr(src_label_id, dst_label_id, edge_label_id); auto ie_csr = graph.get_ie_csr(dst_label_id, src_label_id, edge_label_id); size_t cur_edge_cnt = 0; if (oe_csr) { cur_edge_cnt += oe_csr->edge_num(); } else if (ie_csr) { cur_edge_cnt += ie_csr->edge_num(); } stat.total_edge_count += cur_edge_cnt; vertex_type_pair_statistics.emplace_back( std::tuple{src_label_name, dst_label_name, cur_edge_cnt}); } } } if (!vertex_type_pair_statistics.empty()) { stat.edge_type_statistics.emplace_back(std::tuple{ edge_label_id, edge_label_name, vertex_type_pair_statistics}); } } return stat; } std::string merge_graph_and_plugin_meta( std::shared_ptr<gs::IGraphMetaStore> metadata_store, const std::vector<gs::GraphMeta>& graph_metas) { std::vector<gs::GraphMeta> res_graph_metas; for (auto& graph_meta : graph_metas) { res_graph_metas.push_back(graph_meta); } for (auto& graph_meta : res_graph_metas) { auto all_plugin_meta = metadata_store->GetAllPluginMeta(graph_meta.id); graph_meta.plugin_metas.insert(graph_meta.plugin_metas.end(), all_plugin_meta.value().begin(), all_plugin_meta.value().end()); } rapidjson::Document res(rapidjson::kArrayType); for (auto& graph_meta : res_graph_metas) { rapidjson::Document graph_json(rapidjson::kObjectType, &res.GetAllocator()); graph_meta.ToJson(graph_json, graph_json.GetAllocator()); res.PushBack(graph_json, res.GetAllocator()); } return res.Empty() ? "{}" : gs::rapidjson_stringify(res, 2); } void add_runnable_info(gs::PluginMeta& plugin_meta) { const auto& graph_db = gs::GraphDB::get(); const auto& schema = graph_db.schema(); const auto& plugins_map = schema.GetPlugins(); auto plugin_iter = plugins_map.find(plugin_meta.id); if (plugin_iter != plugins_map.end()) { plugin_meta.runnable = true; } else { plugin_meta.runnable = false; } } gs::Result<gs::JobId> invoke_loading_graph( std::shared_ptr<gs::IGraphMetaStore> metadata_store, const std::string& graph_id, const YAML::Node& loading_config, int32_t loading_thread_num) { // First try to load to a tmp directory. // If the loading process itself is atomic(The tmp data dir will be cleaned // if process terminated unexpectedly, and the original data dir is // recovered) auto tmp_indices_dir = WorkDirManipulator::GetTempIndicesDir(graph_id); WorkDirManipulator::CleanTempIndicesDir(graph_id); auto loading_res = server::WorkDirManipulator::LoadGraph( graph_id, loading_config, loading_thread_num, tmp_indices_dir, metadata_store); if (!loading_res.ok()) { WorkDirManipulator::CleanTempIndicesDir(graph_id); return loading_res.status(); } auto job_id = loading_res.value(); return gs::Result<gs::JobId>(job_id); } seastar::future<seastar::sstring> invoke_creating_procedure( std::shared_ptr<gs::IGraphMetaStore> metadata_store, const std::string& graph_id, const std::string& plugin_creation_parameter) { auto& graph_db_service = GraphDBService::get(); // First create a plugin meta to get the plugin id, then do the real // creation. rapidjson::Document json(rapidjson::kObjectType); if (json.Parse(plugin_creation_parameter.c_str()).HasParseError()) { return seastar::make_exception_future<seastar::sstring>( "Fail to parse parameter as json: " + plugin_creation_parameter); } if (json.HasMember("name")) { // Currently we need id== name rapidjson::Value& name = json["name"]; if (gs::Schema::IsBuiltinPlugin(name.GetString())) { return seastar::make_exception_future<seastar::sstring>( std::string( "The plugin name is a builtin plugin, cannot be created: ") + name.GetString()); } rapidjson::Value name_copy(name, json.GetAllocator()); json.AddMember("id", name_copy, json.GetAllocator()); } json.AddMember("bound_graph", graph_id, json.GetAllocator()); auto nowTime = gs::GetCurrentTimeStamp(); json.AddMember("creation_time", nowTime, json.GetAllocator()); json.AddMember("update_time", nowTime, json.GetAllocator()); if (!json.HasMember("enable")) { json.AddMember("enable", true, json.GetAllocator()); } auto procedure_meta_request = gs::CreatePluginMetaRequest::FromJson(json); LOG(INFO) << "parse create plugin meta:" << procedure_meta_request.ToString(); auto insert_res = metadata_store->CreatePluginMeta(procedure_meta_request); if (!insert_res.ok()) { return seastar::make_exception_future<seastar::sstring>( std::runtime_error(insert_res.status().error_message())); } auto plugin_id = insert_res.value(); return server::WorkDirManipulator::CreateProcedure( graph_id, plugin_id, json, graph_db_service.get_service_config().engine_config_path) .then_wrapped([graph_id = graph_id, old_plugin_id = plugin_id, json = std::move(json), metadata_store = metadata_store](auto&& f) { std::string proc_id; try { proc_id = f.get0(); // proc_yaml path should already checked to exists. if (proc_id.empty()) { metadata_store->DeletePluginMeta(graph_id, old_plugin_id); return seastar::make_exception_future<seastar::sstring>( std::runtime_error("Fail to create plugin: " + proc_id)); } if (proc_id != old_plugin_id) { metadata_store->DeletePluginMeta(graph_id, old_plugin_id); return seastar::make_exception_future<seastar::sstring>( std::runtime_error( std::string( "the generated plugin id is not same as the old one:") + proc_id + " " + old_plugin_id)); } VLOG(10) << "Successfully create plugin and meta: " << proc_id << ", now update " "the plugin meta and update the graph meta: " << graph_id; // Then insert the plugin meta. auto procedure_meta_from_file = WorkDirManipulator::GetProcedureByGraphAndProcedureName(graph_id, proc_id); if (!procedure_meta_from_file.ok()) { VLOG(10) << "Fail to insert plugin meta: " << procedure_meta_from_file.status().error_message(); metadata_store->DeletePluginMeta(graph_id, old_plugin_id); WorkDirManipulator::DeleteProcedure(graph_id, proc_id); return seastar::make_exception_future<seastar::sstring>( std::runtime_error( procedure_meta_from_file.status().error_message() + ", " + proc_id.c_str())); } seastar::sstring procedure_meta_str = procedure_meta_from_file.value(); VLOG(10) << "got procedure meta: " << procedure_meta_str; // When updating procedure meta, we should not change the name. since // neo4j use name as the key. auto internal_plugin_update = gs::UpdatePluginMetaRequest::FromJson(procedure_meta_str); // the field enable should be parsed from json if (json.HasMember("enable")) { internal_plugin_update.enable = json["enable"].GetBool(); } else { internal_plugin_update.enable = true; } // update the library path to the full path if (internal_plugin_update.library.has_value()) { internal_plugin_update.library = WorkDirManipulator::GetGraphPluginDir(graph_id) + "/" + internal_plugin_update.library.value(); } auto str = internal_plugin_update.ToString(); VLOG(10) << "internal plugin update: " << str; auto update_res = metadata_store->UpdatePluginMeta( graph_id, proc_id, internal_plugin_update); VLOG(10) << "update_res: " << update_res.status().ok(); if (!update_res.ok()) { metadata_store->DeletePluginMeta(graph_id, old_plugin_id); WorkDirManipulator::DeleteProcedure(graph_id, proc_id); return seastar::make_exception_future<seastar::sstring>( std::runtime_error(update_res.status().error_message())); } VLOG(10) << "Successfully created procedure: " << proc_id; std::string response = "{\"procedure_id\":\"" + proc_id + "\"}"; return seastar::make_ready_future<seastar::sstring>(response); } catch (std::exception& e) { LOG(ERROR) << "Fail to create plugin: " << e.what(); metadata_store->DeletePluginMeta(graph_id, old_plugin_id); WorkDirManipulator::DeleteProcedure(graph_id, old_plugin_id); return seastar::make_exception_future<seastar::sstring>( std::runtime_error("Fail to create plugin: " + std::string(e.what()))); } }); } gs::Status invoke_delete_plugin_meta( std::shared_ptr<gs::IGraphMetaStore> metadata_store, const std::string& graph_id, const std::string& procedure_id) { // First delete the plugin meta. auto delete_meta_res = metadata_store->DeletePluginMeta(graph_id, procedure_id); if (!delete_meta_res.ok()) { return delete_meta_res.status(); } // Then delete the plugin libxx.so and xxx.yaml on disk auto delete_res = server::WorkDirManipulator::DeleteProcedure(graph_id, procedure_id); if (!delete_res.ok()) { return delete_res.status(); } return gs::Status::OK(); } // util functions gs::Result<seastar::sstring> to_json_str( const std::vector<gs::PluginMeta>& plugin_metas) { rapidjson::Document res(rapidjson::kArrayType); for (auto& plugin_meta : plugin_metas) { rapidjson::Document plugin_json(rapidjson::kObjectType, &res.GetAllocator()); plugin_meta.ToJson(plugin_json, plugin_json.GetAllocator()); res.PushBack(plugin_json, res.GetAllocator()); } return res.Empty() ? gs::Result<seastar::sstring>("{}") : gs::Result<seastar::sstring>(gs::rapidjson_stringify(res)); } gs::Result<seastar::sstring> to_json_str( const std::vector<gs::JobMeta>& job_metas) { rapidjson::Document res(rapidjson::kArrayType); for (auto& job_meta : job_metas) { rapidjson::Document job_json(rapidjson::kObjectType, &res.GetAllocator()); if (job_json.Parse(job_meta.ToJson().c_str()).HasParseError()) { LOG(ERROR) << "Fail to parse job meta"; return gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to parse job meta: ")); } res.PushBack(job_json, res.GetAllocator()); } return gs::Result<seastar::sstring>(gs::rapidjson_stringify(res)); } admin_actor::~admin_actor() { // finalization // ... } admin_actor::admin_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr) : hiactor::actor(exec_ctx, addr) { set_max_concurrency(1); // set max concurrency for task reentrancy (stateful) // initialization // ... auto& graph_db_service = GraphDBService::get(); // meta_data_ should be thread safe. metadata_store_ = graph_db_service.get_metadata_store(); } // Create a new Graph with the passed graph config. seastar::future<admin_query_result> admin_actor::run_create_graph( query_param&& query_param) { LOG(INFO) << "Creating Graph: " << query_param.content; gs::Result<std::string> preprocess_schema_str = gs::preprocess_and_check_schema_json_string(query_param.content); if (!preprocess_schema_str.ok()) { return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(preprocess_schema_str.status())); } auto request = gs::CreateGraphMetaRequest::FromJson(preprocess_schema_str.value()); if (!request.ok()) { LOG(ERROR) << "Fail to parse graph meta: " << request.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(request.status())); } auto result = metadata_store_->CreateGraphMeta(request.value()); // we also need to store a graph.yaml on disk, for other services to read. if (result.ok()) { auto dump_res = WorkDirManipulator::DumpGraphSchema( result.value(), request.value().ToString()); if (!dump_res.ok()) { LOG(ERROR) << "Fail to dump graph schema: " << dump_res.status().error_message(); // If dump schema fails, we should delete the graph meta. metadata_store_->DeleteGraphMeta(result.value()); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(dump_res.status())); } else { VLOG(10) << "Successfully created graph"; std::string response = "{\"graph_id\":\"" + result.value() + "\"}"; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(std::move(response))); } } else { LOG(ERROR) << "Fail to create graph: " << result.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(result.status())); } } // get graph schema // query_param is the graph name seastar::future<admin_query_result> admin_actor::run_get_graph_schema( query_param&& query_param) { LOG(INFO) << "Get Graph schema for graph_id: " << query_param.content; auto schema_res = metadata_store_->GetGraphMeta(query_param.content); if (schema_res.ok()) { return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(std::move(schema_res.value().schema))); } else { LOG(ERROR) << "Fail to get graph schema: " << schema_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(schema_res.status())); } } // Get the metadata of a graph. seastar::future<admin_query_result> admin_actor::run_get_graph_meta( query_param&& query_param) { LOG(INFO) << "Get Graph meta for graph_id: " << query_param.content; auto meta_res = metadata_store_->GetGraphMeta(query_param.content); if (meta_res.ok()) { auto get_all_procedure_res = metadata_store_->GetAllPluginMeta(query_param.content); if (get_all_procedure_res.ok()) { VLOG(10) << "Successfully get all procedures: " << get_all_procedure_res.value().size(); auto& all_plugin_metas = get_all_procedure_res.value(); for (auto& plugin_meta : all_plugin_metas) { add_runnable_info(plugin_meta); } auto& graph_meta = meta_res.value(); // There can also be procedures that builtin in the graph meta. for (auto& plugin_meta : graph_meta.plugin_metas) { add_runnable_info(plugin_meta); if (plugin_meta.bound_graph.empty()) { plugin_meta.bound_graph = query_param.content; } } graph_meta.plugin_metas.insert(graph_meta.plugin_metas.end(), all_plugin_metas.begin(), all_plugin_metas.end()); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(std::move(graph_meta.ToJson()))); } else { LOG(ERROR) << "Fail to get all procedures: " << get_all_procedure_res.status().error_message() << " for " << query_param.content; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(get_all_procedure_res.status())); } } else { LOG(ERROR) << "Fail to get graph schema: " << meta_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(meta_res.status())); } } // list all graphs seastar::future<admin_query_result> admin_actor::run_list_graphs( query_param&& query_param) { LOG(INFO) << "List all graphs."; auto all_graph_meta_res = metadata_store_->GetAllGraphMeta(); // The plugin meta are stored separately, so we need to merge them. if (!all_graph_meta_res.ok()) { LOG(ERROR) << "Fail to list graphs: " << all_graph_meta_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(all_graph_meta_res.status())); } else { VLOG(10) << "Successfully list graphs"; // collect all 'schema' field into a json string return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(merge_graph_and_plugin_meta( metadata_store_, all_graph_meta_res.value()))); } } // delete one graph seastar::future<admin_query_result> admin_actor::run_delete_graph( query_param&& query_param) { LOG(INFO) << "Delete graph: " << query_param.content; auto lock_info = metadata_store_->GetGraphIndicesLocked(query_param.content); if (!lock_info.ok()) { LOG(ERROR) << "Fail to get lock info for graph: " << query_param.content; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(lock_info.status())); } if (lock_info.value()) { LOG(ERROR) << "Graph is running, cannot delete: " << query_param.content; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::ALREADY_LOCKED, "Graph is running, cannot delete: " + query_param.content))); } auto get_res = metadata_store_->GetGraphMeta(query_param.content); if (!get_res.ok()) { LOG(ERROR) << "Graph not exists: " << query_param.content; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(get_res.status())); } auto delete_res = metadata_store_->DeleteGraphMeta(query_param.content); if (delete_res.ok()) { // delete the disk data auto delete_plugins_res = metadata_store_->DeletePluginMetaByGraphId(query_param.content); if (!delete_plugins_res.ok()) { LOG(ERROR) << "Fail to delete graph's plugins: " << delete_plugins_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(delete_plugins_res.status())); } WorkDirManipulator::DeleteGraph(query_param.content); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(to_message_json( "Successfully delete graph: " + query_param.content))); } else { LOG(ERROR) << "Fail to delete graph: " << delete_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(delete_res.status())); } } // load the graph. seastar::future<admin_query_result> admin_actor::run_graph_loading( graph_management_param&& query_param) { // query_param contains two parameter, first for graph name, second for // graph config auto content = query_param.content; auto& graph_id = content.first; VLOG(1) << "Parse json payload for graph: " << graph_id; auto& loading_config = content.second; YAML::Node yaml; try { // parse json from query_param.content rapidjson::Document doc; if (doc.Parse(loading_config.c_str()).HasParseError()) { throw std::runtime_error("Fail to parse json: " + std::to_string(doc.GetParseError())); } std::stringstream json_ss; json_ss << loading_config; yaml = YAML::Load(json_ss); } catch (std::exception& e) { LOG(ERROR) << "Fail to parse json: " << e.what(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INVALID_IMPORT_FILE, "Fail to parse json: " + std::string(e.what())))); } catch (...) { LOG(ERROR) << "Fail to parse json: " << loading_config; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INVALID_IMPORT_FILE, "Fail to parse json: "))); } int32_t loading_thread_num = 1; if (yaml["loading_thread_num"]) { loading_thread_num = yaml["loading_thread_num"].as<int32_t>(); } // First check graph exists auto graph_meta_res = metadata_store_->GetGraphMeta(graph_id); if (!graph_meta_res.ok()) { LOG(ERROR) << "Graph not exists: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(graph_meta_res.status())); } // try to lock the graph indices dir auto lock_res = metadata_store_->LockGraphIndices(graph_id); if (!lock_res.ok() || !lock_res.value()) { LOG(ERROR) << "Fail to lock graph indices dir: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::ALREADY_LOCKED, "Fail to acquire lock for graph indices dir: " + graph_id + ", maybe the graph is already running"))); } std::string graph_id_str = graph_id.c_str(); auto job_id_res = invoke_loading_graph(metadata_store_, graph_id_str, yaml, loading_thread_num); if (!job_id_res.ok()) { LOG(ERROR) << "Fail to run graph loading : " << job_id_res.status().error_message(); metadata_store_->UnlockGraphIndices(graph_id); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(job_id_res.status())); } seastar::sstring res = "{\"job_id\":\"" + job_id_res.value() + "\"}"; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(std::move(res))); } // Get all procedure with graph_name and procedure_name seastar::future<admin_query_result> admin_actor::get_procedure_by_procedure_name( procedure_query_param&& query_param) { auto& graph_id = query_param.content.first; auto& procedure_id = query_param.content.second; auto get_graph_res = metadata_store_->GetGraphMeta(graph_id); if (!get_graph_res.ok()) { LOG(ERROR) << "Graph not exists: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(get_graph_res.status())); } LOG(INFO) << "Get procedure: " << procedure_id << " for graph: " << graph_id; auto get_procedure_res = metadata_store_->GetPluginMeta(graph_id, procedure_id); auto builtin_plugins = gs::get_builtin_plugin_metas(); for (auto& builtin_plugin : builtin_plugins) { if (builtin_plugin.id == procedure_id.c_str()) { add_runnable_info(builtin_plugin); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(builtin_plugin.ToJson())); } } if (get_procedure_res.ok()) { VLOG(10) << "Successfully get procedure procedures"; auto& proc_meta = get_procedure_res.value(); add_runnable_info(proc_meta); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(proc_meta.ToJson())); } else { LOG(ERROR) << "Fail to get procedure for graph: " << graph_id << " and procedure: " << procedure_id << ", error message: " << get_procedure_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(get_procedure_res.status())); } } // Get all procedures of one graph. seastar::future<admin_query_result> admin_actor::get_procedures_by_graph_name( query_param&& query_param) { auto& graph_id = query_param.content; // first check graph exists auto graph_meta_res = metadata_store_->GetGraphMeta(graph_id); if (!graph_meta_res.ok()) { LOG(ERROR) << "Graph not exists: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(graph_meta_res.status())); } auto get_all_procedure_res = metadata_store_->GetAllPluginMeta(graph_id); if (get_all_procedure_res.ok()) { VLOG(10) << "Successfully get all procedures: " << get_all_procedure_res.value().size(); auto& all_plugin_metas = get_all_procedure_res.value(); for (auto& plugin_meta : all_plugin_metas) { add_runnable_info(plugin_meta); } for (auto& plugin_meta : graph_meta_res.value().plugin_metas) { add_runnable_info(plugin_meta); } all_plugin_metas.insert(all_plugin_metas.end(), graph_meta_res.value().plugin_metas.begin(), graph_meta_res.value().plugin_metas.end()); return seastar::make_ready_future<admin_query_result>( to_json_str(all_plugin_metas)); } else { LOG(ERROR) << "Fail to get all procedures: " << get_all_procedure_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(get_all_procedure_res.status())); } } seastar::future<admin_query_result> admin_actor::create_procedure( create_procedure_query_param&& query_param) { auto& graph_id = query_param.content.first; auto& parameter = query_param.content.second; auto graph_meta_res = metadata_store_->GetGraphMeta(graph_id); if (!graph_meta_res.ok()) { LOG(ERROR) << "Graph not exists: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(graph_meta_res.status())); } auto lock_res = metadata_store_->LockGraphPlugins(graph_id); if (!lock_res.ok() || !lock_res.value()) { LOG(ERROR) << "Fail to lock graph plugin dir: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::ALREADY_LOCKED, "Fail to acquire lock for graph plugin dir: " + graph_id + ", try again later"))); } return invoke_creating_procedure(metadata_store_, graph_id, parameter) .then_wrapped([this, graph_id = graph_id](auto&& f) { auto unlock_res = metadata_store_->UnlockGraphPlugins(graph_id); if (!unlock_res.ok()) { LOG(ERROR) << "Fail to unlock graph plugin dir: " << graph_id; } try { auto res = f.get(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(std::move(res))); } catch (std::exception& e) { LOG(ERROR) << "Fail to create procedure: " << e.what(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to create procedure: " + std::string(e.what())))); } }); } // Delete a procedure by graph name and procedure name seastar::future<admin_query_result> admin_actor::delete_procedure( create_procedure_query_param&& query_param) { auto& graph_id = query_param.content.first; auto& procedure_id = query_param.content.second; auto graph_meta_res = metadata_store_->GetGraphMeta(graph_id); if (!graph_meta_res.ok()) { LOG(ERROR) << "Graph not exists: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(graph_meta_res.status())); } if (gs::Schema::IsBuiltinPlugin(procedure_id)) { LOG(ERROR) << "The plugin name is a builtin plugin, cannot be deleted: " << procedure_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::ILLEGAL_OPERATION, "The plugin name is a builtin plugin, cannot be deleted: " + procedure_id))); } auto get_procedure_res = metadata_store_->GetPluginMeta(graph_id, procedure_id); if (!get_procedure_res.ok()) { LOG(ERROR) << "Procedure " << procedure_id << " not exists on graph: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::NOT_FOUND, "Procedure " + procedure_id + " not exists on graph: " + graph_id))); } auto lock_res = metadata_store_->LockGraphPlugins(graph_id); if (!lock_res.ok() || !lock_res.value()) { LOG(ERROR) << "Fail to lock graph plugin dir: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::ALREADY_LOCKED, "Fail to acquire lock for graph plugin dir: " + graph_id + ", try again later"))); } auto delete_res = invoke_delete_plugin_meta(metadata_store_, graph_id, procedure_id); auto unlock_res = metadata_store_->UnlockGraphPlugins(graph_id); if (!unlock_res.ok()) { LOG(ERROR) << "Fail to unlock graph plugin dir: " << graph_id; } if (!delete_res.ok()) { LOG(ERROR) << "Fail to run delete procedure: " << delete_res.error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(delete_res)); } VLOG(10) << "Successfully delete procedure: " << procedure_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( to_message_json("Successfully delete procedure: " + procedure_id))); } // update a procedure by graph name and procedure name seastar::future<admin_query_result> admin_actor::update_procedure( update_procedure_query_param&& query_param) { auto& graph_id = std::get<0>(query_param.content); auto& procedure_id = std::get<1>(query_param.content); auto& update_request_json = std::get<2>(query_param.content); auto graph_meta_res = metadata_store_->GetGraphMeta(graph_id); if (!graph_meta_res.ok()) { LOG(ERROR) << "Graph not exists: " << graph_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(graph_meta_res.status())); } if (gs::Schema::IsBuiltinPlugin(procedure_id)) { LOG(ERROR) << "The plugin name is a builtin plugin, cannot be updated: " << procedure_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::ILLEGAL_OPERATION, "The plugin name is a builtin plugin, cannot be updated: " + procedure_id))); } auto get_procedure_res = metadata_store_->GetPluginMeta(graph_id, procedure_id); if (!get_procedure_res.ok()) { LOG(ERROR) << "Procedure not exists: " << procedure_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(get_procedure_res.status())); } VLOG(10) << "update request json: " << update_request_json; auto req = gs::UpdatePluginMetaRequest::FromJson(update_request_json); VLOG(10) << "Update plugin req: " << req.ToString(); // If updatePluginMetaRequest contains field params, returns, library, and // option, we warning and return. if (req.params.has_value() || req.returns.has_value() || req.library.has_value() || req.option.has_value()) { LOG(ERROR) << "UpdatePluginMetaRequest contains field params, returns, " "library, or option, which should not be updated."; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::ILLEGAL_OPERATION, "UpdatePluginMetaRequest contains field params, returns, library, " "and option, which should not be updated."))); } if (req.name.has_value()) { LOG(ERROR) << "UpdatePluginMetaRequest contains field 'name', which should " "not be updated."; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::ILLEGAL_OPERATION, "UpdatePluginMetaRequest contains field " "'name', which should not be updated."))); } auto update_res = metadata_store_->UpdatePluginMeta(graph_id, procedure_id, req); if (update_res.ok()) { VLOG(10) << "Successfully update procedure: " << procedure_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( to_message_json("Successfully update procedure: " + procedure_id))); } else { LOG(ERROR) << "Fail to create procedure: " << update_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(update_res.status())); } } // Start service on a graph first means stop all current running actors, then // switch graph and create new actors with a unused scope_id. seastar::future<admin_query_result> admin_actor::start_service( query_param&& query_param) { // parse query_param.content as json and get graph_name auto& content = query_param.content; std::string graph_name; auto cur_running_graph_res = metadata_store_->GetRunningGraph(); if (!cur_running_graph_res.ok()) { LOG(INFO) << "No running graph, will start on the graph in request"; } auto cur_running_graph = cur_running_graph_res.value(); LOG(INFO) << "Current running graph: " << cur_running_graph; try { if (!content.empty()) { rapidjson::Document json; if (json.Parse(content.c_str()).HasParseError()) { throw std::runtime_error("Fail to parse json: " + std::to_string(json.GetParseError())); } if (json.HasMember("graph_id")) { graph_name = json["graph_id"].GetString(); } } else { graph_name = cur_running_graph; LOG(WARNING) << "Request payload is empty, will restart on current graph: " << graph_name; } LOG(WARNING) << "Starting service with graph: " << graph_name; } catch (std::exception& e) { LOG(ERROR) << "Fail to Start service: "; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INVALID_SCHEMA, "Fail to parse json: " + std::string(e.what())))); } auto get_graph_res = metadata_store_->GetGraphMeta(graph_name); if (!get_graph_res.ok()) { LOG(ERROR) << "Fail to get graph meta: " << get_graph_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(get_graph_res.status())); } auto get_lock_res = metadata_store_->GetGraphIndicesLocked(graph_name); if (!get_lock_res.ok()) { LOG(ERROR) << "Failed to get lock for graph: " << graph_name; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(get_lock_res.status())); } auto prev_lock = get_lock_res.value(); if (prev_lock) { if (cur_running_graph == graph_name) { LOG(INFO) << "Service already running on graph: " << graph_name; } else { LOG(ERROR) << "The graph is locked but not running: " << graph_name << ", maybe a data loading job is running on this graph"; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::ALREADY_LOCKED, "The graph is locked but not running: " + graph_name + ", maybe a data loading job is running on this graph"))); } } else { LOG(INFO) << "The graph is not locked: " << graph_name; auto acquire_lock_res = metadata_store_->LockGraphIndices(graph_name); if (!acquire_lock_res.ok() || !acquire_lock_res.value()) { LOG(ERROR) << "Fail to lock graph: " << graph_name; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::ALREADY_LOCKED, "Fail to acquire lock for graph: " + graph_name + ", try again later"))); } LOG(INFO) << "Successfully locked graph: " << graph_name; } // Dump the latest schema to file, which include all enabled plugins. auto plugins_res = metadata_store_->GetAllPluginMeta(graph_name); if (!plugins_res.ok()) { LOG(ERROR) << "Fail to get all plugins: " << plugins_res.status().error_message(); if (!prev_lock) { // If the graph is not locked before, and we fail at some // steps after locking, we should unlock it. metadata_store_->UnlockGraphIndices(graph_name); } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(plugins_res.status())); } // Note that the plugin meta contains both builtin and user-defined plugins, // we need to remove the builtin plugins from the plugin meta. auto& graph_meta = get_graph_res.value(); auto& additional_plugins = plugins_res.value(); const auto& all_builtin_plugins = gs::get_builtin_plugin_metas(); for (const auto& builtin_plugin : all_builtin_plugins) { auto it = std::remove_if(additional_plugins.begin(), additional_plugins.end(), [&builtin_plugin](const gs::PluginMeta& plugin_meta) { return plugin_meta.id == builtin_plugin.id; }); additional_plugins.erase(it, additional_plugins.end()); auto it2 = std::remove_if( graph_meta.plugin_metas.begin(), graph_meta.plugin_metas.end(), [&builtin_plugin](const gs::PluginMeta& plugin_meta) { return plugin_meta.id == builtin_plugin.id; }); graph_meta.plugin_metas.erase(it2, graph_meta.plugin_metas.end()); } // With all enabled plugins and graph schema, dump to a new schema file. auto dump_res = WorkDirManipulator::DumpGraphSchema(graph_meta, additional_plugins); if (!dump_res.ok()) { LOG(ERROR) << "Fail to dump graph schema: " << dump_res.status().error_message(); if (!prev_lock) { metadata_store_->UnlockGraphIndices(graph_name); } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(dump_res.status())); } auto schema_res = server::WorkDirManipulator::GetGraphSchema(graph_name); if (!schema_res.ok()) { LOG(ERROR) << "Fail to get graph schema: " << schema_res.status().error_message() << ", " << graph_name; if (!prev_lock) { metadata_store_->UnlockGraphIndices(graph_name); } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(schema_res.status())); } auto& schema_value = schema_res.value(); auto data_dir = server::WorkDirManipulator::GetDataDirectory(graph_name); if (!data_dir.ok()) { LOG(ERROR) << "Fail to get data directory: " << data_dir.status().error_message(); if (!prev_lock) { // If the graph is not locked before, and we fail at // some steps after locking, we should unlock it. metadata_store_->UnlockGraphIndices(graph_name); } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(data_dir.status())); } auto data_dir_value = data_dir.value(); // First Stop query_handler's actors. auto& graph_db_service = GraphDBService::get(); return graph_db_service.stop_query_actors().then( [this, prev_lock, graph_name, schema_value, cur_running_graph, data_dir_value, &graph_db_service] { LOG(INFO) << "Successfully stopped query handler"; { std::lock_guard<std::mutex> lock(mtx_); auto& db = gs::GraphDB::get(); LOG(INFO) << "Update service running on graph:" << graph_name; // use the previous thread num auto thread_num = db.SessionNum(); auto config = db.config(); config.data_dir = data_dir_value; config.schema = schema_value; db.Close(); VLOG(10) << "Closed the previous graph db"; if (!db.Open(config).ok()) { LOG(ERROR) << "Fail to load graph from data directory: " << data_dir_value; if (!prev_lock) { // If the graph is not locked before, and we // fail at some steps after locking, we should // unlock it. metadata_store_->UnlockGraphIndices(graph_name); } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to load graph from data directory: " + data_dir_value))); } LOG(INFO) << "Successfully load graph from data directory: " << data_dir_value; // unlock the previous graph if (graph_name != cur_running_graph) { auto unlock_res = metadata_store_->UnlockGraphIndices(cur_running_graph); if (!unlock_res.ok()) { LOG(ERROR) << "Fail to unlock graph: " << cur_running_graph; if (!prev_lock) { metadata_store_->UnlockGraphIndices(graph_name); } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(unlock_res.status())); } } LOG(INFO) << "Update running graph to: " << graph_name; auto set_res = metadata_store_->SetRunningGraph(graph_name); if (!set_res.ok()) { LOG(ERROR) << "Fail to set running graph: " << graph_name; if (!prev_lock) { metadata_store_->UnlockGraphIndices(graph_name); } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(set_res.status())); } } graph_db_service.start_query_actors(); // start on a new scope. LOG(INFO) << "Successfully started service with graph: " << graph_name; graph_db_service.reset_start_time(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( to_message_json("Successfully start service"))); }); } // Stop service. // Actually stop the query_handler's actors. // The port is still connectable. seastar::future<admin_query_result> admin_actor::stop_service( query_param&& query_param) { // Try to get the json content from query_param std::string graph_id = ""; try { auto& content = query_param.content; if (!content.empty()) { rapidjson::Document json; if (json.Parse(content.c_str()).HasParseError()) { throw std::runtime_error("Fail to parse json: " + std::to_string(json.GetParseError())); } if (json.HasMember("graph_id")) { graph_id = json["graph_id"].GetString(); } LOG(INFO) << "Stop service with graph: " << graph_id; } } catch (std::exception& e) { LOG(ERROR) << "Fail to stop service: "; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::BAD_REQUEST, "Fail to parse json: " + std::string(e.what())))); } auto& graph_db_service = GraphDBService::get(); return graph_db_service.stop_query_actors().then([this, graph_id] { LOG(INFO) << "Successfully stopped query handler"; // Add also remove current running graph { std::lock_guard<std::mutex> lock(mtx_); // unlock the graph auto cur_running_graph_res = metadata_store_->GetRunningGraph(); if (cur_running_graph_res.ok()) { if (!graph_id.empty() && graph_id != cur_running_graph_res.value()) { LOG(ERROR) << "The specified graph is not running: " << cur_running_graph_res.value(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::NOT_FOUND, "The graph is not running: " + cur_running_graph_res.value()))); } auto unlock_res = metadata_store_->UnlockGraphIndices(cur_running_graph_res.value()); if (!unlock_res.ok()) { LOG(ERROR) << "Fail to unlock graph: " << cur_running_graph_res.value(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(unlock_res.status())); } if (!metadata_store_->ClearRunningGraph().ok()) { LOG(ERROR) << "Fail to clear running graph"; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::INTERNAL_ERROR, "Fail to clear running graph"))); } } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( to_message_json("Successfully stop service"))); } }); } seastar::future<admin_query_result> admin_actor::service_ready( query_param&& query_param) { auto& graph_db_service = GraphDBService::get(); return graph_db_service.is_actors_running() ? seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>("true")) : seastar::make_exception_future<admin_query_result>( gs::Status(gs::StatusCode::SERVICE_UNAVAILABLE, "Service is not ready")); } // get service status seastar::future<admin_query_result> admin_actor::service_status( query_param&& query_param) { auto& graph_db_service = GraphDBService::get(); auto query_port = graph_db_service.get_query_port(); auto running_graph_res = metadata_store_->GetRunningGraph(); rapidjson::Document res(rapidjson::kObjectType); if (query_port != 0) { res.AddMember("statistics_enabled", true, res.GetAllocator()); res.AddMember("status", graph_db_service.is_actors_running() ? "Running" : "Stopped", res.GetAllocator()); res.AddMember("hqps_port", query_port, res.GetAllocator()); res.AddMember("bolt_port", graph_db_service.get_service_config().bolt_port, res.GetAllocator()); res.AddMember("gremlin_port", graph_db_service.get_service_config().gremlin_port, res.GetAllocator()); if (running_graph_res.ok()) { auto graph_meta_res = metadata_store_->GetGraphMeta(running_graph_res.value()); if (graph_meta_res.ok()) { auto& graph_meta = graph_meta_res.value(); // Add the plugin meta. auto get_all_procedure_res = metadata_store_->GetAllPluginMeta(running_graph_res.value()); if (get_all_procedure_res.ok()) { auto& all_plugin_metas = get_all_procedure_res.value(); for (auto& plugin_meta : all_plugin_metas) { add_runnable_info(plugin_meta); } for (auto& plugin_meta : graph_meta.plugin_metas) { add_runnable_info(plugin_meta); } for (auto& plugin_meta : all_plugin_metas) { if (plugin_meta.runnable) { graph_meta.plugin_metas.emplace_back(plugin_meta); } } rapidjson::Document graph_json(rapidjson::kObjectType, &res.GetAllocator()); graph_meta.ToJson(graph_json, graph_json.GetAllocator()); res.AddMember("graph", graph_json, res.GetAllocator()); } else { LOG(ERROR) << "Fail to get all procedures: " << get_all_procedure_res.status().error_message(); return seastar::make_exception_future<admin_query_result>( get_all_procedure_res.status()); } } else { LOG(ERROR) << "Fail to get graph meta: " << graph_meta_res.status().error_message(); res.AddMember("graph", rapidjson::Value(rapidjson::kNullType), res.GetAllocator()); return seastar::make_exception_future<admin_query_result>( graph_meta_res.status()); } } else { res.AddMember("graph", rapidjson::Value(rapidjson::kNullType), res.GetAllocator()); LOG(INFO) << "No graph is running"; } res.AddMember("start_time", graph_db_service.get_start_time(), res.GetAllocator()); } else { LOG(INFO) << "Query service has not been inited!"; res.AddMember("status", "Query service has not been inited!", res.GetAllocator()); } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::rapidjson_stringify(res))); } // get node status. seastar::future<admin_query_result> admin_actor::node_status( query_param&& query_param) { // get current host' cpu usage and memory usage auto cpu_usage = gs::get_current_cpu_usage(); auto mem_usage = gs::get_total_physical_memory_usage(); // construct the result json string rapidjson::Document json(rapidjson::kObjectType); { std::stringstream ss; if (cpu_usage.first < 0 || cpu_usage.second <= 0) { ss << "cpu_usage is not available"; } else { ss << "cpu_usage is " << cpu_usage.first << " / " << cpu_usage.second; } json.AddMember("cpu_usage", ss.str(), json.GetAllocator()); } { std::stringstream ss; ss << "memory_usage is " << gs::memory_to_mb_str(mem_usage.first) << " / " << gs::memory_to_mb_str(mem_usage.second); json.AddMember("memory_usage", ss.str(), json.GetAllocator()); } return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::rapidjson_stringify(json))); } ///////////////////////// Job related ///////////////////////// seastar::future<admin_query_result> admin_actor::get_job( query_param&& query_param) { auto& job_id = query_param.content; auto job_meta_res = metadata_store_->GetJobMeta(job_id); if (job_meta_res.ok()) { VLOG(10) << "Successfully get job: " << job_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(job_meta_res.value().ToJson())); } else { LOG(ERROR) << "Fail to get job: " << job_id << ", error message: " << job_meta_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(job_meta_res.status())); } } seastar::future<admin_query_result> admin_actor::list_jobs( query_param&& query_param) { auto list_res = metadata_store_->GetAllJobMeta(); if (list_res.ok()) { VLOG(10) << "Successfully list jobs"; return seastar::make_ready_future<admin_query_result>( to_json_str(list_res.value())); } else { LOG(ERROR) << "Fail to list jobs: " << list_res.status().error_message(); return seastar::make_ready_future<admin_query_result>(list_res.status()); } } // cancel job seastar::future<admin_query_result> admin_actor::cancel_job( query_param&& query_param) { auto& job_id = query_param.content; auto get_job_meta_res = metadata_store_->GetJobMeta(job_id); if (!get_job_meta_res.ok()) { LOG(ERROR) << "Job not exists: " << job_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(get_job_meta_res.status())); } auto& job_meta = get_job_meta_res.value(); if (job_meta.process_id <= 0) { LOG(ERROR) << "Invalid process id: " << job_meta.process_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::StatusCode::INTERNAL_ERROR, "Invalid process id: " + std::to_string(job_meta.process_id))); } // if job is already cancelled, return directly. if (job_meta.status == gs::JobStatus::kCancelled || job_meta.status == gs::JobStatus::kFailed || job_meta.status == gs::JobStatus::kSuccess) { return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::ILLEGAL_OPERATION, "Job already " + std::to_string(job_meta.status) + ": " + job_id.c_str()))); } if (job_meta.status == gs::JobStatus::kUnknown) { VLOG(10) << "Job status is unknown, try cancelling"; } boost::process::child::child_handle child(job_meta.process_id); std::error_code ec; boost::process::detail::api::terminate(child, ec); VLOG(10) << "Killing process: " << job_meta.process_id << ", res: " << ec.message(); if (ec.value() != 0) { LOG(ERROR) << "Fail to kill process: " << job_meta.process_id << ", error message: " << ec.message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::INTERNAL_ERROR, "Fail to kill process: " + std::to_string(job_meta.process_id) + ", error message: " + ec.message()))); } // Now update job meta to cancelled. auto update_job_meta_request = gs::UpdateJobMetaRequest::NewCancel(); auto cancel_meta_res = metadata_store_->UpdateJobMeta(job_id, update_job_meta_request); if (cancel_meta_res.ok()) { VLOG(10) << "Successfully cancel job: " << job_id; return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( to_message_json("Successfully cancel job: " + job_id))); } else { LOG(ERROR) << "Fail to cancel job: " << job_id << ", error message: " << cancel_meta_res.status().error_message(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(cancel_meta_res.status())); } } // Get the statistics of the current running graph, if no graph is running, // return empty. seastar::future<admin_query_result> admin_actor::run_get_graph_statistic( query_param&& query_param) { std::string queried_graph = query_param.content.c_str(); auto cur_running_graph_res = metadata_store_->GetRunningGraph(); if (!cur_running_graph_res.ok()) { // no graph is running return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(gs::Status( gs::StatusCode::NOT_FOUND, "No graph is running currently"))); } auto& graph_id = cur_running_graph_res.value(); if (graph_id != queried_graph) { return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( gs::Status(gs::StatusCode::NOT_FOUND, "The queried graph is not running: " + graph_id + ", current running graph is: " + queried_graph))); } auto statistics = get_graph_statistics( gs::GraphDB::get().GetSession(hiactor::local_shard_id())); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(statistics.ToJson())); } seastar::future<admin_query_result> admin_actor::upload_file( graph_management_param&& query_param) { auto upload_res = WorkDirManipulator::CreateFile(query_param.content.first, query_param.content.second); if (upload_res.ok()) { auto value = upload_res.value(); return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>( seastar::sstring(value.data(), value.size()))); } else { return seastar::make_ready_future<admin_query_result>( gs::Result<seastar::sstring>(upload_res.status())); } } } // namespace server