in flex/engines/http_server/actor/admin_actor.act.cc [842:1057]
seastar::future<admin_query_result> admin_actor::start_service(
query_param&& query_param) {
// parse query_param.content as json and get graph_name
auto& content = query_param.content;
std::string graph_name;
auto cur_running_graph_res = metadata_store_->GetRunningGraph();
if (!cur_running_graph_res.ok()) {
LOG(INFO) << "No running graph, will start on the graph in request";
}
auto cur_running_graph = cur_running_graph_res.value();
LOG(INFO) << "Current running graph: " << cur_running_graph;
try {
if (!content.empty()) {
rapidjson::Document json;
if (json.Parse(content.c_str()).HasParseError()) {
throw std::runtime_error("Fail to parse json: " +
std::to_string(json.GetParseError()));
}
if (json.HasMember("graph_id")) {
graph_name = json["graph_id"].GetString();
}
} else {
graph_name = cur_running_graph;
LOG(WARNING)
<< "Request payload is empty, will restart on current graph: "
<< graph_name;
}
LOG(WARNING) << "Starting service with graph: " << graph_name;
} catch (std::exception& e) {
LOG(ERROR) << "Fail to Start service: ";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::INVALID_SCHEMA,
"Fail to parse json: " + std::string(e.what()))));
}
auto get_graph_res = metadata_store_->GetGraphMeta(graph_name);
if (!get_graph_res.ok()) {
LOG(ERROR) << "Fail to get graph meta: "
<< get_graph_res.status().error_message();
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(get_graph_res.status()));
}
auto get_lock_res = metadata_store_->GetGraphIndicesLocked(graph_name);
if (!get_lock_res.ok()) {
LOG(ERROR) << "Failed to get lock for graph: " << graph_name;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(get_lock_res.status()));
}
auto prev_lock = get_lock_res.value();
if (prev_lock) {
if (cur_running_graph == graph_name) {
LOG(INFO) << "Service already running on graph: " << graph_name;
} else {
LOG(ERROR) << "The graph is locked but not running: " << graph_name
<< ", maybe a data loading job is running on this graph";
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::ALREADY_LOCKED,
"The graph is locked but not running: " + graph_name +
", maybe a data loading job is running on this graph")));
}
} else {
LOG(INFO) << "The graph is not locked: " << graph_name;
auto acquire_lock_res = metadata_store_->LockGraphIndices(graph_name);
if (!acquire_lock_res.ok() || !acquire_lock_res.value()) {
LOG(ERROR) << "Fail to lock graph: " << graph_name;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::ALREADY_LOCKED,
"Fail to acquire lock for graph: " + graph_name +
", try again later")));
}
LOG(INFO) << "Successfully locked graph: " << graph_name;
}
// Dump the latest schema to file, which include all enabled plugins.
auto plugins_res = metadata_store_->GetAllPluginMeta(graph_name);
if (!plugins_res.ok()) {
LOG(ERROR) << "Fail to get all plugins: "
<< plugins_res.status().error_message();
if (!prev_lock) {
// If the graph is not locked before, and we fail at some
// steps after locking, we should unlock it.
metadata_store_->UnlockGraphIndices(graph_name);
}
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(plugins_res.status()));
}
// Note that the plugin meta contains both builtin and user-defined plugins,
// we need to remove the builtin plugins from the plugin meta.
auto& graph_meta = get_graph_res.value();
auto& additional_plugins = plugins_res.value();
const auto& all_builtin_plugins = gs::get_builtin_plugin_metas();
for (const auto& builtin_plugin : all_builtin_plugins) {
auto it =
std::remove_if(additional_plugins.begin(), additional_plugins.end(),
[&builtin_plugin](const gs::PluginMeta& plugin_meta) {
return plugin_meta.id == builtin_plugin.id;
});
additional_plugins.erase(it, additional_plugins.end());
auto it2 = std::remove_if(
graph_meta.plugin_metas.begin(), graph_meta.plugin_metas.end(),
[&builtin_plugin](const gs::PluginMeta& plugin_meta) {
return plugin_meta.id == builtin_plugin.id;
});
graph_meta.plugin_metas.erase(it2, graph_meta.plugin_metas.end());
}
// With all enabled plugins and graph schema, dump to a new schema file.
auto dump_res =
WorkDirManipulator::DumpGraphSchema(graph_meta, additional_plugins);
if (!dump_res.ok()) {
LOG(ERROR) << "Fail to dump graph schema: "
<< dump_res.status().error_message();
if (!prev_lock) {
metadata_store_->UnlockGraphIndices(graph_name);
}
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(dump_res.status()));
}
auto schema_res = server::WorkDirManipulator::GetGraphSchema(graph_name);
if (!schema_res.ok()) {
LOG(ERROR) << "Fail to get graph schema: "
<< schema_res.status().error_message() << ", " << graph_name;
if (!prev_lock) {
metadata_store_->UnlockGraphIndices(graph_name);
}
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(schema_res.status()));
}
auto& schema_value = schema_res.value();
auto data_dir = server::WorkDirManipulator::GetDataDirectory(graph_name);
if (!data_dir.ok()) {
LOG(ERROR) << "Fail to get data directory: "
<< data_dir.status().error_message();
if (!prev_lock) { // If the graph is not locked before, and we fail at
// some steps after locking, we should unlock it.
metadata_store_->UnlockGraphIndices(graph_name);
}
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(data_dir.status()));
}
auto data_dir_value = data_dir.value();
// First Stop query_handler's actors.
auto& graph_db_service = GraphDBService::get();
return graph_db_service.stop_query_actors().then(
[this, prev_lock, graph_name, schema_value, cur_running_graph,
data_dir_value, &graph_db_service] {
LOG(INFO) << "Successfully stopped query handler";
{
std::lock_guard<std::mutex> lock(mtx_);
auto& db = gs::GraphDB::get();
LOG(INFO) << "Update service running on graph:" << graph_name;
// use the previous thread num
auto thread_num = db.SessionNum();
auto config = db.config();
config.data_dir = data_dir_value;
config.schema = schema_value;
db.Close();
VLOG(10) << "Closed the previous graph db";
if (!db.Open(config).ok()) {
LOG(ERROR) << "Fail to load graph from data directory: "
<< data_dir_value;
if (!prev_lock) { // If the graph is not locked before, and we
// fail at some steps after locking, we should
// unlock it.
metadata_store_->UnlockGraphIndices(graph_name);
}
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::INTERNAL_ERROR,
"Fail to load graph from data directory: " +
data_dir_value)));
}
LOG(INFO) << "Successfully load graph from data directory: "
<< data_dir_value;
// unlock the previous graph
if (graph_name != cur_running_graph) {
auto unlock_res =
metadata_store_->UnlockGraphIndices(cur_running_graph);
if (!unlock_res.ok()) {
LOG(ERROR) << "Fail to unlock graph: " << cur_running_graph;
if (!prev_lock) {
metadata_store_->UnlockGraphIndices(graph_name);
}
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(unlock_res.status()));
}
}
LOG(INFO) << "Update running graph to: " << graph_name;
auto set_res = metadata_store_->SetRunningGraph(graph_name);
if (!set_res.ok()) {
LOG(ERROR) << "Fail to set running graph: " << graph_name;
if (!prev_lock) {
metadata_store_->UnlockGraphIndices(graph_name);
}
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(set_res.status()));
}
}
graph_db_service.start_query_actors(); // start on a new scope.
LOG(INFO) << "Successfully started service with graph: " << graph_name;
graph_db_service.reset_start_time();
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
to_message_json("Successfully start service")));
});
}