flex/engines/graph_db/database/graph_db.cc (476 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/app/builtin/count_vertices.h"
#include "flex/engines/graph_db/app/builtin/k_hop_neighbors.h"
#include "flex/engines/graph_db/app/builtin/pagerank.h"
#include "flex/engines/graph_db/app/builtin/shortest_path_among_three.h"
#include "flex/engines/graph_db/app/cypher_read_app.h"
#include "flex/engines/graph_db/app/cypher_write_app.h"
#include "flex/engines/graph_db/app/hqps_app.h"
#include "flex/engines/graph_db/app/server_app.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/graph_db/database/wal/wal.h"
#include "flex/engines/graph_db/runtime/execute/plan_parser.h"
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
#include "flex/utils/yaml_utils.h"
#include "flex/third_party/httplib.h"
namespace gs {
struct SessionLocalContext {
SessionLocalContext(GraphDB& db, const std::string& work_dir, int thread_id,
MemoryStrategy allocator_strategy,
std::unique_ptr<IWalWriter> in_logger)
: allocator(allocator_strategy,
(allocator_strategy != MemoryStrategy::kSyncToFile
? ""
: thread_local_allocator_prefix(work_dir, thread_id))),
logger(std::move(in_logger)),
session(db, allocator, *logger, work_dir, thread_id) {}
~SessionLocalContext() {
if (logger) {
logger->close();
}
}
Allocator allocator;
char _padding0[128 - sizeof(Allocator) % 128];
std::unique_ptr<IWalWriter> logger;
char _padding1[4096 - sizeof(std::unique_ptr<IWalWriter>) -
sizeof(Allocator) - sizeof(_padding0)];
GraphDBSession session;
char _padding2[4096 - sizeof(GraphDBSession) % 4096];
};
GraphDB::GraphDB() = default;
GraphDB::~GraphDB() {
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
if (contexts_ != nullptr) {
// showAppMetrics();
for (int i = 0; i < thread_num_; ++i) {
contexts_[i].~SessionLocalContext();
}
free(contexts_);
}
WalWriterFactory::Finalize();
WalParserFactory::Finalize();
}
GraphDB& GraphDB::get() {
static GraphDB db;
return db;
}
Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num, bool warmup, bool memory_only,
bool enable_auto_compaction) {
GraphDBConfig config(schema, data_dir, "", thread_num);
config.warmup = warmup;
if (memory_only) {
config.memory_level = 1;
} else {
config.memory_level = 0;
}
config.enable_auto_compaction = enable_auto_compaction;
return Open(config);
}
Result<bool> GraphDB::Open(const GraphDBConfig& config) {
config_ = config;
const std::string& data_dir = config.data_dir;
const Schema& schema = config.schema;
if (!std::filesystem::exists(data_dir)) {
std::filesystem::create_directories(data_dir);
}
std::string schema_file = schema_path(data_dir);
bool create_empty_graph = false;
if (!std::filesystem::exists(schema_file)) {
create_empty_graph = true;
graph_.mutable_schema() = schema;
}
work_dir_ = data_dir;
thread_num_ = config.thread_num;
try {
graph_.Open(data_dir, config.memory_level);
} catch (std::exception& e) {
LOG(ERROR) << "Exception: " << e.what();
return Result<bool>(StatusCode::INTERNAL_ERROR,
"Exception: " + std::string(e.what()), false);
}
if ((!create_empty_graph) && (!graph_.schema().Equals(schema))) {
LOG(ERROR) << "Schema inconsistent..\n";
return Result<bool>(StatusCode::INTERNAL_ERROR,
"Schema of work directory is not compatible with the "
"graph schema",
false);
}
// Set the plugin info from schema to graph_.schema(), since the plugin info
// is not serialized and deserialized.
auto& mutable_schema = graph_.mutable_schema();
mutable_schema.SetPluginDir(schema.GetPluginDir());
mutable_schema.set_compiler_path(config.compiler_path);
std::vector<std::pair<std::string, std::string>> plugin_name_paths;
const auto& plugins = schema.GetPlugins();
for (auto plugin_pair : plugins) {
plugin_name_paths.emplace_back(
std::make_pair(plugin_pair.first, plugin_pair.second.first));
}
std::sort(plugin_name_paths.begin(), plugin_name_paths.end(),
[&](const std::pair<std::string, std::string>& a,
const std::pair<std::string, std::string>& b) {
return plugins.at(a.first).second < plugins.at(b.first).second;
});
mutable_schema.EmplacePlugins(plugin_name_paths);
last_compaction_ts_ = 0;
MemoryStrategy allocator_strategy = MemoryStrategy::kMemoryOnly;
if (config.memory_level == 0) {
allocator_strategy = MemoryStrategy::kSyncToFile;
} else if (config.memory_level >= 2) {
allocator_strategy = MemoryStrategy::kHugepagePrefered;
}
openWalAndCreateContexts(config, data_dir, allocator_strategy);
if ((!create_empty_graph) && config.warmup) {
graph_.Warmup(thread_num_);
}
if (config.enable_monitoring) {
if (monitor_thread_running_) {
monitor_thread_running_ = false;
monitor_thread_.join();
}
monitor_thread_running_ = true;
monitor_thread_ = std::thread([&]() {
std::vector<double> last_eval_durations(thread_num_, 0);
std::vector<int64_t> last_query_nums(thread_num_, 0);
while (monitor_thread_running_) {
sleep(10);
size_t curr_allocated_size = 0;
double total_eval_durations = 0;
double min_eval_duration = std::numeric_limits<double>::max();
double max_eval_duration = 0;
int64_t total_query_num = 0;
int64_t min_query_num = std::numeric_limits<int64_t>::max();
int64_t max_query_num = 0;
for (int i = 0; i < thread_num_; ++i) {
curr_allocated_size += contexts_[i].allocator.allocated_memory();
if (last_eval_durations[i] == 0) {
last_eval_durations[i] = contexts_[i].session.eval_duration();
} else {
double curr = contexts_[i].session.eval_duration();
double eval_duration = curr;
total_eval_durations += eval_duration;
min_eval_duration = std::min(min_eval_duration, eval_duration);
max_eval_duration = std::max(max_eval_duration, eval_duration);
last_eval_durations[i] = curr;
}
if (last_query_nums[i] == 0) {
last_query_nums[i] = contexts_[i].session.query_num();
} else {
int64_t curr = contexts_[i].session.query_num();
total_query_num += curr;
min_query_num = std::min(min_query_num, curr);
max_query_num = std::max(max_query_num, curr);
last_query_nums[i] = curr;
}
}
if (max_query_num != 0) {
double avg_eval_durations =
total_eval_durations / static_cast<double>(thread_num_);
double avg_query_num = static_cast<double>(total_query_num) /
static_cast<double>(thread_num_);
double allocated_size_in_gb =
static_cast<double>(curr_allocated_size) / 1024.0 / 1024.0 /
1024.0;
LOG(INFO) << "allocated: " << allocated_size_in_gb << " GB, eval: ["
<< min_eval_duration << ", " << avg_eval_durations << ", "
<< max_eval_duration << "] s, query num: [" << min_query_num
<< ", " << avg_query_num << ", " << max_query_num << "]";
}
}
});
}
if (config.enable_auto_compaction) {
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
compact_thread_running_ = true;
compact_thread_ = std::thread([&]() {
size_t last_compaction_at = 0;
while (compact_thread_running_) {
size_t query_num_before = getExecutedQueryNum();
sleep(30);
if (!compact_thread_running_) {
break;
}
size_t query_num_after = getExecutedQueryNum();
if (query_num_before == query_num_after &&
(query_num_after > (last_compaction_at + 100000))) {
VLOG(10) << "Trigger auto compaction";
last_compaction_at = query_num_after;
timestamp_t ts = this->version_manager_.acquire_update_timestamp();
auto txn =
CompactTransaction(this->graph_, *this->contexts_[0].logger,
this->version_manager_, ts);
OutputCypherProfiles("./" + std::to_string(ts) + "_");
txn.Commit();
VLOG(10) << "Finish compaction";
}
}
});
}
unlink((work_dir_ + "/statistics.json").c_str());
graph_.generateStatistics(work_dir_);
runtime::CypherRunnerImpl::get().clear_cache();
return Result<bool>(true);
}
void GraphDB::Close() {
if (monitor_thread_running_) {
monitor_thread_running_ = false;
monitor_thread_.join();
}
if (compact_thread_running_) {
compact_thread_running_ = false;
compact_thread_.join();
}
//-----------Clear graph_db----------------
graph_.Clear();
version_manager_.clear();
if (contexts_ != nullptr) {
for (int i = 0; i < thread_num_; ++i) {
contexts_[i].~SessionLocalContext();
}
free(contexts_);
contexts_ = nullptr;
}
std::fill(app_paths_.begin(), app_paths_.end(), "");
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
}
ReadTransaction GraphDB::GetReadTransaction(int thread_id) {
return contexts_[thread_id].session.GetReadTransaction();
}
InsertTransaction GraphDB::GetInsertTransaction(int thread_id) {
return contexts_[thread_id].session.GetInsertTransaction();
}
SingleVertexInsertTransaction GraphDB::GetSingleVertexInsertTransaction(
int thread_id) {
return contexts_[thread_id].session.GetSingleVertexInsertTransaction();
}
SingleEdgeInsertTransaction GraphDB::GetSingleEdgeInsertTransaction(
int thread_id) {
return contexts_[thread_id].session.GetSingleEdgeInsertTransaction();
}
UpdateTransaction GraphDB::GetUpdateTransaction(int thread_id) {
return contexts_[thread_id].session.GetUpdateTransaction();
}
GraphDBSession& GraphDB::GetSession(int thread_id) {
return contexts_[thread_id].session;
}
const GraphDBSession& GraphDB::GetSession(int thread_id) const {
return contexts_[thread_id].session;
}
int GraphDB::SessionNum() const { return thread_num_; }
void GraphDB::UpdateCompactionTimestamp(timestamp_t ts) {
last_compaction_ts_ = ts;
}
timestamp_t GraphDB::GetLastCompactionTimestamp() const {
return last_compaction_ts_;
}
AppWrapper GraphDB::CreateApp(uint8_t app_type, int thread_id) {
if (app_factories_[app_type] == nullptr) {
LOG(ERROR) << "Stored procedure " << static_cast<int>(app_type)
<< " is not registered.";
return AppWrapper(NULL, NULL);
} else {
return app_factories_[app_type]->CreateApp(*this);
}
}
bool GraphDB::registerApp(const std::string& plugin_path, uint8_t index) {
// this function will only be called when initializing the graph db
VLOG(10) << "Registering stored procedure at:" << std::to_string(index)
<< ", path:" << plugin_path;
if (!app_factories_[index] && app_paths_[index].empty()) {
app_paths_[index] = plugin_path;
app_factories_[index] =
std::make_shared<SharedLibraryAppFactory>(plugin_path);
return true;
} else {
LOG(ERROR) << "Stored procedure has already been registered at:"
<< std::to_string(index) << ", path:" << app_paths_[index];
return false;
}
}
void GraphDB::GetAppInfo(Encoder& output) {
std::string ret;
for (size_t i = 1; i != 256; ++i) {
if (!app_paths_.empty()) {
output.put_string(app_paths_[i]);
}
}
}
static void IngestWalRange(SessionLocalContext* contexts,
MutablePropertyFragment& graph,
const IWalParser& parser, uint32_t from, uint32_t to,
int thread_num) {
std::atomic<uint32_t> cur_ts(from);
std::vector<std::thread> threads(thread_num);
for (int i = 0; i < thread_num; ++i) {
threads[i] = std::thread(
[&](int tid) {
auto& alloc = contexts[tid].allocator;
while (true) {
uint32_t got_ts = cur_ts.fetch_add(1);
if (got_ts >= to) {
break;
}
const auto& unit = parser.get_insert_wal(got_ts);
InsertTransaction::IngestWal(graph, got_ts, unit.ptr, unit.size,
alloc);
if (got_ts % 1000000 == 0) {
LOG(INFO) << "Ingested " << got_ts << " WALs";
}
}
},
i);
}
for (auto& thrd : threads) {
thrd.join();
}
}
void GraphDB::ingestWals(IWalParser& parser, const std::string& work_dir,
int thread_num) {
uint32_t from_ts = 1;
for (auto& update_wal : parser.get_update_wals()) {
uint32_t to_ts = update_wal.timestamp;
if (from_ts < to_ts) {
IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num);
}
if (update_wal.size == 0) {
graph_.Compact(update_wal.timestamp);
last_compaction_ts_ = update_wal.timestamp;
} else {
UpdateTransaction::IngestWal(graph_, work_dir, to_ts, update_wal.ptr,
update_wal.size, contexts_[0].allocator);
}
from_ts = to_ts + 1;
}
if (from_ts <= parser.last_ts()) {
IngestWalRange(contexts_, graph_, parser, from_ts, parser.last_ts() + 1,
thread_num);
}
version_manager_.init_ts(parser.last_ts(), thread_num);
}
void GraphDB::initApps(
const std::unordered_map<std::string, std::pair<std::string, uint8_t>>&
plugins) {
VLOG(1) << "Initializing stored procedures, size: " << plugins.size()
<< " ...";
for (size_t i = 0; i < 256; ++i) {
app_factories_[i] = nullptr;
}
// Builtin apps
app_factories_[0] = std::make_shared<ServerAppFactory>();
app_factories_[Schema::BUILTIN_COUNT_VERTICES_PLUGIN_ID] =
std::make_shared<CountVerticesFactory>();
app_factories_[Schema::BUILTIN_PAGERANK_PLUGIN_ID] =
std::make_shared<PageRankFactory>();
app_factories_[Schema::BUILTIN_K_DEGREE_NEIGHBORS_PLUGIN_ID] =
std::make_shared<KNeighborsFactory>();
app_factories_[Schema::BUILTIN_TVSP_PLUGIN_ID] =
std::make_shared<ShortestPathAmongThreeFactory>();
app_factories_[Schema::HQPS_ADHOC_READ_PLUGIN_ID] =
std::make_shared<HQPSAdhocReadAppFactory>();
app_factories_[Schema::HQPS_ADHOC_WRITE_PLUGIN_ID] =
std::make_shared<HQPSAdhocWriteAppFactory>();
app_factories_[Schema::ADHOC_READ_PLUGIN_ID] =
std::make_shared<CypherReadAppFactory>();
app_factories_[Schema::CYPHER_READ_DEBUG_PLUGIN_ID] =
std::make_shared<CypherReadAppFactory>();
auto& parser = gs::runtime::PlanParser::get();
parser.init();
app_factories_[Schema::ADHOC_READ_PLUGIN_ID] =
std::make_shared<CypherReadAppFactory>();
app_factories_[Schema::CYPHER_READ_PLUGIN_ID] =
std::make_shared<CypherReadAppFactory>();
app_factories_[Schema::CYPHER_WRITE_PLUGIN_ID] =
std::make_shared<CypherWriteAppFactory>();
size_t valid_plugins = 0;
for (auto& path_and_index : plugins) {
auto path = path_and_index.second.first;
auto name = path_and_index.first;
auto index = path_and_index.second.second;
if (!Schema::IsBuiltinPlugin(name)) {
if (registerApp(path, index)) {
++valid_plugins;
}
} else {
valid_plugins++;
}
}
LOG(INFO) << "Successfully registered stored procedures : " << valid_plugins
<< ", from " << plugins.size();
}
void GraphDB::openWalAndCreateContexts(const GraphDBConfig& config,
const std::string& data_dir,
MemoryStrategy allocator_strategy) {
WalWriterFactory::Init();
WalParserFactory::Init();
contexts_ = static_cast<SessionLocalContext*>(
aligned_alloc(4096, sizeof(SessionLocalContext) * thread_num_));
std::filesystem::create_directories(allocator_dir(data_dir));
// Open the wal writer.
std::string wal_uri = config.wal_uri;
if (wal_uri.empty()) {
LOG(ERROR) << "wal_uri is not set, use default wal_uri";
wal_uri = wal_dir(data_dir);
} else if (wal_uri.find("{GRAPH_DATA_DIR}") != std::string::npos) {
LOG(INFO) << "Template {GRAPH_DATA_DIR} found in wal_uri, replace it with "
"data_dir";
wal_uri = std::regex_replace(wal_uri, std::regex("\\{GRAPH_DATA_DIR\\}"),
data_dir);
}
VLOG(1) << "Using wal uri: " << wal_uri;
for (int i = 0; i < thread_num_; ++i) {
new (&contexts_[i])
SessionLocalContext(*this, data_dir, i, allocator_strategy,
WalWriterFactory::CreateWalWriter(wal_uri));
}
auto wal_parser = WalParserFactory::CreateWalParser(wal_uri);
ingestWals(*wal_parser, data_dir, thread_num_);
for (int i = 0; i < thread_num_; ++i) {
contexts_[i].logger->open(wal_uri, i);
}
initApps(graph_.schema().GetPlugins());
VLOG(1) << "Successfully restore load plugins";
}
void GraphDB::showAppMetrics() const {
int session_num = SessionNum();
for (int i = 0; i < 256; ++i) {
AppMetric summary;
for (int k = 0; k < session_num; ++k) {
summary += GetSession(k).GetAppMetric(i);
}
if (!summary.empty()) {
std::string query_name = "UNKNOWN";
if (i == 0) {
query_name = "ServerApp";
} else {
query_name = "Query-" + std::to_string(i);
}
summary.output(query_name);
}
}
}
size_t GraphDB::getExecutedQueryNum() const {
size_t ret = 0;
for (int i = 0; i < thread_num_; ++i) {
ret += contexts_[i].session.query_num();
}
return ret;
}
void GraphDB::OutputCypherProfiles(const std::string& prefix) {
runtime::OprTimer read_timer, write_timer;
int session_num = SessionNum();
for (int i = 0; i < session_num; ++i) {
auto read_app_ptr = GetSession(i).GetApp(Schema::CYPHER_READ_PLUGIN_ID);
auto casted_read_app = dynamic_cast<CypherReadApp*>(read_app_ptr);
if (casted_read_app) {
read_timer += casted_read_app->timer();
}
auto write_app_ptr = GetSession(i).GetApp(Schema::CYPHER_WRITE_PLUGIN_ID);
auto casted_write_app = dynamic_cast<CypherWriteApp*>(write_app_ptr);
if (casted_write_app) {
write_timer += casted_write_app->timer();
}
}
read_timer.output(prefix + "read_profile.log");
write_timer.output(prefix + "write_profile.log");
}
} // namespace gs