flex/engines/graph_db/database/graph_db_session.cc (253 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <chrono> #include "flex/engines/graph_db/app/app_base.h" #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/database/graph_db_session.h" #include "flex/utils/app_utils.h" #include "flex/proto_generated_gie/stored_procedure.pb.h" #include "service_utils.h" #include <rapidjson/document.h> namespace gs { ReadTransaction GraphDBSession::GetReadTransaction() const { uint32_t ts = db_.version_manager_.acquire_read_timestamp(); return ReadTransaction(*this, db_.graph_, db_.version_manager_, ts); } InsertTransaction GraphDBSession::GetInsertTransaction() { uint32_t ts = db_.version_manager_.acquire_insert_timestamp(); return InsertTransaction(*this, db_.graph_, alloc_, logger_, db_.version_manager_, ts); } SingleVertexInsertTransaction GraphDBSession::GetSingleVertexInsertTransaction() { uint32_t ts = db_.version_manager_.acquire_insert_timestamp(); return SingleVertexInsertTransaction(db_.graph_, alloc_, logger_, db_.version_manager_, ts); } SingleEdgeInsertTransaction GraphDBSession::GetSingleEdgeInsertTransaction() { uint32_t ts = db_.version_manager_.acquire_insert_timestamp(); return SingleEdgeInsertTransaction(db_.graph_, alloc_, logger_, db_.version_manager_, ts); } UpdateTransaction GraphDBSession::GetUpdateTransaction() { uint32_t ts = db_.version_manager_.acquire_update_timestamp(); return UpdateTransaction(*this, db_.graph_, alloc_, work_dir_, logger_, db_.version_manager_, ts); } bool GraphDBSession::BatchUpdate(UpdateBatch& batch) { return GetUpdateTransaction().batch_commit(batch); } const MutablePropertyFragment& GraphDBSession::graph() const { return db_.graph(); } const GraphDB& GraphDBSession::db() const { return db_; } MutablePropertyFragment& GraphDBSession::graph() { return db_.graph(); } const Schema& GraphDBSession::schema() const { return db_.schema(); } std::shared_ptr<ColumnBase> GraphDBSession::get_vertex_property_column( uint8_t label, const std::string& col_name) const { return db_.get_vertex_property_column(label, col_name); } std::shared_ptr<RefColumnBase> GraphDBSession::get_vertex_id_column( uint8_t label) const { if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kInt64) { return std::make_shared<TypedRefColumn<int64_t>>( dynamic_cast<const TypedColumn<int64_t>&>( db_.graph().lf_indexers_[label].get_keys())); } else if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kInt32) { return std::make_shared<TypedRefColumn<int32_t>>( dynamic_cast<const TypedColumn<int32_t>&>( db_.graph().lf_indexers_[label].get_keys())); } else if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kUInt64) { return std::make_shared<TypedRefColumn<uint64_t>>( dynamic_cast<const TypedColumn<uint64_t>&>( db_.graph().lf_indexers_[label].get_keys())); } else if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kUInt32) { return std::make_shared<TypedRefColumn<uint32_t>>( dynamic_cast<const TypedColumn<uint32_t>&>( db_.graph().lf_indexers_[label].get_keys())); } else if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kStringView) { return std::make_shared<TypedRefColumn<std::string_view>>( dynamic_cast<const TypedColumn<std::string_view>&>( db_.graph().lf_indexers_[label].get_keys())); } else { return nullptr; } } Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) { const auto start = std::chrono::high_resolution_clock::now(); if (input.size() < 2) { return Result<std::vector<char>>( StatusCode::INVALID_ARGUMENT, "Invalid input, input size: " + std::to_string(input.size()), std::vector<char>()); } auto type_res = parse_query_type(input); if (!type_res.ok()) { LOG(ERROR) << "Fail to parse query type"; return Result<std::vector<char>>(type_res.status(), std::vector<char>()); } uint8_t type; std::string_view sv; std::tie(type, sv) = type_res.value(); std::vector<char> result_buffer; Encoder encoder(result_buffer); Decoder decoder(sv.data(), sv.size()); AppBase* app = GetApp(type); if (!app) { return Result<std::vector<char>>( StatusCode::NOT_FOUND, "Procedure not found, id:" + std::to_string((int) type), result_buffer); } for (size_t i = 0; i < MAX_RETRY; ++i) { result_buffer.clear(); if (app->run(*this, decoder, encoder)) { const auto end = std::chrono::high_resolution_clock::now(); app_metrics_[type].add_record( std::chrono::duration_cast<std::chrono::microseconds>(end - start) .count()); eval_duration_.fetch_add( std::chrono::duration_cast<std::chrono::microseconds>(end - start) .count()); ++query_num_; return result_buffer; } LOG(INFO) << "[Query-" << (int) type << "][Thread-" << thread_id_ << "] retry - " << i << " / " << MAX_RETRY; if (i + 1 < MAX_RETRY) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } decoder.reset(sv.data(), sv.size()); } const auto end = std::chrono::high_resolution_clock::now(); eval_duration_.fetch_add( std::chrono::duration_cast<std::chrono::microseconds>(end - start) .count()); ++query_num_; // When query failed, we assume the user may put the error message in the // output buffer. // For example, for adhoc_app.cc, if the query failed, the error info will // be put in the output buffer. if (result_buffer.size() > 4) { return Result<std::vector<char>>( StatusCode::QUERY_FAILED, std::string{result_buffer.data() + 4, result_buffer.size() - 4}, // The first 4 bytes are the length of the message. result_buffer); } else { return Result<std::vector<char>>( StatusCode::QUERY_FAILED, "Query failed for procedure id:" + std::to_string((int) type), result_buffer); } } void GraphDBSession::GetAppInfo(Encoder& result) { db_.GetAppInfo(result); } int GraphDBSession::SessionId() const { return thread_id_; } CompactTransaction GraphDBSession::GetCompactTransaction() { timestamp_t ts = db_.version_manager_.acquire_update_timestamp(); return CompactTransaction(db_.graph_, logger_, db_.version_manager_, ts); } bool GraphDBSession::Compact() { auto txn = GetCompactTransaction(); if (txn.timestamp() > db_.GetLastCompactionTimestamp() + 100000) { db_.UpdateCompactionTimestamp(txn.timestamp()); txn.Commit(); return true; } else { txn.Abort(); return false; } } double GraphDBSession::eval_duration() const { return static_cast<double>(eval_duration_.load()) / 1000000.0; } int64_t GraphDBSession::query_num() const { return query_num_.load(); } AppBase* GraphDBSession::GetApp(const std::string& app_name) { auto& app_name_to_path_index = db_.schema().GetPlugins(); if (app_name_to_path_index.count(app_name) <= 0) { LOG(ERROR) << "Query name is not registered: " << app_name; return nullptr; } return GetApp(app_name_to_path_index.at(app_name).second); } #define likely(x) __builtin_expect(!!(x), 1) AppBase* GraphDBSession::GetApp(int type) { // create if not exist if (type >= GraphDBSession::MAX_PLUGIN_NUM) { LOG(ERROR) << "Query type is out of range: " << type << " > " << GraphDBSession::MAX_PLUGIN_NUM; return nullptr; } AppBase* app = nullptr; if (likely(apps_[type] != nullptr)) { app = apps_[type]; } else { app_wrappers_[type] = db_.CreateApp(type, thread_id_); if (app_wrappers_[type].app() == NULL) { LOG(ERROR) << "[Query-" + std::to_string((int) type) << "] is not registered..."; return nullptr; } else { apps_[type] = app_wrappers_[type].app(); app = apps_[type]; } } return app; } #undef likely // likely Result<std::pair<uint8_t, std::string_view>> GraphDBSession::parse_query_type_from_cypher_json( const std::string_view& str_view) { VLOG(10) << "string view: " << str_view; rapidjson::Document j; if (j.Parse(std::string(str_view.data(), str_view.size() - 1)) .HasParseError()) { LOG(ERROR) << "Fail to parse json from input content"; return Result<std::pair<uint8_t, std::string_view>>(gs::Status( StatusCode::INTERNAL_ERROR, "Fail to parse json from input content")); } std::string query_name = j["query_name"].GetString(); const auto& app_name_to_path_index = schema().GetPlugins(); if (app_name_to_path_index.count(query_name) <= 0) { LOG(ERROR) << "Query name is not registered: " << query_name; return Result<std::pair<uint8_t, std::string_view>>(gs::Status( StatusCode::NOT_FOUND, "Query name is not registered: " + query_name)); } if (j.HasMember("arguments")) { for (auto& arg : j["arguments"].GetArray()) { VLOG(10) << "arg: " << jsonToString(arg); } } VLOG(10) << "Query name: " << query_name; return std::make_pair(app_name_to_path_index.at(query_name).second, str_view); } Result<std::pair<uint8_t, std::string_view>> GraphDBSession::parse_query_type_from_cypher_internal( const std::string_view& str_view) { procedure::Query cur_query; if (!cur_query.ParseFromArray(str_view.data(), str_view.size() - 1)) { LOG(ERROR) << "Fail to parse query from input content"; return Result<std::pair<uint8_t, std::string_view>>(gs::Status( StatusCode::INTERNAL_ERROR, "Fail to parse query from input content")); } auto query_name = cur_query.query_name().name(); if (query_name.empty()) { LOG(ERROR) << "Query name is empty"; return Result<std::pair<uint8_t, std::string_view>>( gs::Status(StatusCode::NOT_FOUND, "Query name is empty")); } const auto& app_name_to_path_index = schema().GetPlugins(); // First check whether the query name is builtin query for (int i = 0; i < Schema::BUILTIN_PLUGIN_NUM; ++i) { std::string builtin_query_name = Schema::BUILTIN_PLUGIN_NAMES[i]; if (query_name == builtin_query_name) { return std::make_pair(Schema::BUILTIN_PLUGIN_IDS[i], str_view); } } if (app_name_to_path_index.count(query_name) <= 0) { LOG(ERROR) << "Query name is not registered: " << query_name; return Result<std::pair<uint8_t, std::string_view>>(gs::Status( StatusCode::NOT_FOUND, "Query name is not registered: " + query_name)); } return std::make_pair(app_name_to_path_index.at(query_name).second, str_view); } const AppMetric& GraphDBSession::GetAppMetric(int idx) const { return app_metrics_[idx]; } } // namespace gs