seastar::future admin_actor::start_service()

in flex/engines/http_server/actor/admin_actor.act.cc [842:1057]


seastar::future<admin_query_result> admin_actor::start_service(
    query_param&& query_param) {
  // parse query_param.content as json and get graph_name
  auto& content = query_param.content;
  std::string graph_name;

  auto cur_running_graph_res = metadata_store_->GetRunningGraph();
  if (!cur_running_graph_res.ok()) {
    LOG(INFO) << "No running graph, will start on the graph in request";
  }
  auto cur_running_graph = cur_running_graph_res.value();
  LOG(INFO) << "Current running graph: " << cur_running_graph;
  try {
    if (!content.empty()) {
      rapidjson::Document json;
      if (json.Parse(content.c_str()).HasParseError()) {
        throw std::runtime_error("Fail to parse json: " +
                                 std::to_string(json.GetParseError()));
      }
      if (json.HasMember("graph_id")) {
        graph_name = json["graph_id"].GetString();
      }
    } else {
      graph_name = cur_running_graph;
      LOG(WARNING)
          << "Request payload is empty, will restart on current graph: "
          << graph_name;
    }
    LOG(WARNING) << "Starting service with graph: " << graph_name;
  } catch (std::exception& e) {
    LOG(ERROR) << "Fail to Start service: ";
    return seastar::make_ready_future<admin_query_result>(
        gs::Result<seastar::sstring>(
            gs::Status(gs::StatusCode::INVALID_SCHEMA,
                       "Fail to parse json: " + std::string(e.what()))));
  }

  auto get_graph_res = metadata_store_->GetGraphMeta(graph_name);
  if (!get_graph_res.ok()) {
    LOG(ERROR) << "Fail to get graph meta: "
               << get_graph_res.status().error_message();
    return seastar::make_ready_future<admin_query_result>(
        gs::Result<seastar::sstring>(get_graph_res.status()));
  }

  auto get_lock_res = metadata_store_->GetGraphIndicesLocked(graph_name);
  if (!get_lock_res.ok()) {
    LOG(ERROR) << "Failed to get lock for graph: " << graph_name;
    return seastar::make_ready_future<admin_query_result>(
        gs::Result<seastar::sstring>(get_lock_res.status()));
  }
  auto prev_lock = get_lock_res.value();
  if (prev_lock) {
    if (cur_running_graph == graph_name) {
      LOG(INFO) << "Service already running on graph: " << graph_name;
    } else {
      LOG(ERROR) << "The graph is locked but not running: " << graph_name
                 << ", maybe a data loading job is running on this graph";
      return seastar::make_ready_future<admin_query_result>(
          gs::Result<seastar::sstring>(gs::Status(
              gs::StatusCode::ALREADY_LOCKED,
              "The graph is locked but not running: " + graph_name +
                  ", maybe a data loading job is running on this graph")));
    }
  } else {
    LOG(INFO) << "The graph is not locked: " << graph_name;
    auto acquire_lock_res = metadata_store_->LockGraphIndices(graph_name);
    if (!acquire_lock_res.ok() || !acquire_lock_res.value()) {
      LOG(ERROR) << "Fail to lock graph: " << graph_name;
      return seastar::make_ready_future<admin_query_result>(
          gs::Result<seastar::sstring>(
              gs::Status(gs::StatusCode::ALREADY_LOCKED,
                         "Fail to acquire lock for graph: " + graph_name +
                             ", try again later")));
    }
    LOG(INFO) << "Successfully locked graph: " << graph_name;
  }

  // Dump the latest schema to file, which include all enabled plugins.
  auto plugins_res = metadata_store_->GetAllPluginMeta(graph_name);
  if (!plugins_res.ok()) {
    LOG(ERROR) << "Fail to get all plugins: "
               << plugins_res.status().error_message();
    if (!prev_lock) {
      // If the graph is not locked before, and we fail at some
      // steps after locking, we should unlock it.
      metadata_store_->UnlockGraphIndices(graph_name);
    }
    return seastar::make_ready_future<admin_query_result>(
        gs::Result<seastar::sstring>(plugins_res.status()));
  }
  // Note that the plugin meta contains both builtin and user-defined plugins,
  // we need to remove the builtin plugins from the plugin meta.
  auto& graph_meta = get_graph_res.value();
  auto& additional_plugins = plugins_res.value();
  const auto& all_builtin_plugins = gs::get_builtin_plugin_metas();
  for (const auto& builtin_plugin : all_builtin_plugins) {
    auto it =
        std::remove_if(additional_plugins.begin(), additional_plugins.end(),
                       [&builtin_plugin](const gs::PluginMeta& plugin_meta) {
                         return plugin_meta.id == builtin_plugin.id;
                       });
    additional_plugins.erase(it, additional_plugins.end());
    auto it2 = std::remove_if(
        graph_meta.plugin_metas.begin(), graph_meta.plugin_metas.end(),
        [&builtin_plugin](const gs::PluginMeta& plugin_meta) {
          return plugin_meta.id == builtin_plugin.id;
        });
    graph_meta.plugin_metas.erase(it2, graph_meta.plugin_metas.end());
  }

  // With all enabled plugins and graph schema, dump to a new schema file.
  auto dump_res =
      WorkDirManipulator::DumpGraphSchema(graph_meta, additional_plugins);
  if (!dump_res.ok()) {
    LOG(ERROR) << "Fail to dump graph schema: "
               << dump_res.status().error_message();
    if (!prev_lock) {
      metadata_store_->UnlockGraphIndices(graph_name);
    }
    return seastar::make_ready_future<admin_query_result>(
        gs::Result<seastar::sstring>(dump_res.status()));
  }

  auto schema_res = server::WorkDirManipulator::GetGraphSchema(graph_name);
  if (!schema_res.ok()) {
    LOG(ERROR) << "Fail to get graph schema: "
               << schema_res.status().error_message() << ", " << graph_name;
    if (!prev_lock) {
      metadata_store_->UnlockGraphIndices(graph_name);
    }
    return seastar::make_ready_future<admin_query_result>(
        gs::Result<seastar::sstring>(schema_res.status()));
  }
  auto& schema_value = schema_res.value();
  auto data_dir = server::WorkDirManipulator::GetDataDirectory(graph_name);
  if (!data_dir.ok()) {
    LOG(ERROR) << "Fail to get data directory: "
               << data_dir.status().error_message();
    if (!prev_lock) {  // If the graph is not locked before, and we fail at
                       // some steps after locking, we should unlock it.
      metadata_store_->UnlockGraphIndices(graph_name);
    }
    return seastar::make_ready_future<admin_query_result>(
        gs::Result<seastar::sstring>(data_dir.status()));
  }
  auto data_dir_value = data_dir.value();

  // First Stop query_handler's actors.

  auto& graph_db_service = GraphDBService::get();
  return graph_db_service.stop_query_actors().then(
      [this, prev_lock, graph_name, schema_value, cur_running_graph,
       data_dir_value, &graph_db_service] {
        LOG(INFO) << "Successfully stopped query handler";

        {
          std::lock_guard<std::mutex> lock(mtx_);
          auto& db = gs::GraphDB::get();
          LOG(INFO) << "Update service running on graph:" << graph_name;

          // use the previous thread num
          auto thread_num = db.SessionNum();
          auto config = db.config();
          config.data_dir = data_dir_value;
          config.schema = schema_value;
          db.Close();
          VLOG(10) << "Closed the previous graph db";
          if (!db.Open(config).ok()) {
            LOG(ERROR) << "Fail to load graph from data directory: "
                       << data_dir_value;
            if (!prev_lock) {  // If the graph is not locked before, and we
                               // fail at some steps after locking, we should
                               // unlock it.
              metadata_store_->UnlockGraphIndices(graph_name);
            }
            return seastar::make_ready_future<admin_query_result>(
                gs::Result<seastar::sstring>(
                    gs::Status(gs::StatusCode::INTERNAL_ERROR,
                               "Fail to load graph from data directory: " +
                                   data_dir_value)));
          }
          LOG(INFO) << "Successfully load graph from data directory: "
                    << data_dir_value;
          // unlock the previous graph
          if (graph_name != cur_running_graph) {
            auto unlock_res =
                metadata_store_->UnlockGraphIndices(cur_running_graph);
            if (!unlock_res.ok()) {
              LOG(ERROR) << "Fail to unlock graph: " << cur_running_graph;
              if (!prev_lock) {
                metadata_store_->UnlockGraphIndices(graph_name);
              }
              return seastar::make_ready_future<admin_query_result>(
                  gs::Result<seastar::sstring>(unlock_res.status()));
            }
          }
          LOG(INFO) << "Update running graph to: " << graph_name;
          auto set_res = metadata_store_->SetRunningGraph(graph_name);
          if (!set_res.ok()) {
            LOG(ERROR) << "Fail to set running graph: " << graph_name;
            if (!prev_lock) {
              metadata_store_->UnlockGraphIndices(graph_name);
            }
            return seastar::make_ready_future<admin_query_result>(
                gs::Result<seastar::sstring>(set_res.status()));
          }
        }
        graph_db_service.start_query_actors();  // start on a new scope.
        LOG(INFO) << "Successfully started service with graph: " << graph_name;
        graph_db_service.reset_start_time();
        return seastar::make_ready_future<admin_query_result>(
            gs::Result<seastar::sstring>(
                to_message_json("Successfully start service")));
      });
}