flex/storages/rt_mutable_graph/mutable_property_fragment.cc (495 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"
#include "flex/engines/hqps_db/core/utils/hqps_utils.h"
#include "flex/storages/rt_mutable_graph/file_names.h"
#include "flex/utils/property/types.h"
namespace gs {
MutablePropertyFragment::MutablePropertyFragment() {}
MutablePropertyFragment::~MutablePropertyFragment() {
std::vector<size_t> degree_list(vertex_label_num_, 0);
for (size_t i = 0; i < vertex_label_num_; ++i) {
degree_list[i] = lf_indexers_[i].size();
vertex_data_[i].resize(degree_list[i]);
}
for (size_t src_label = 0; src_label != vertex_label_num_; ++src_label) {
for (size_t dst_label = 0; dst_label != vertex_label_num_; ++dst_label) {
for (size_t e_label = 0; e_label != edge_label_num_; ++e_label) {
size_t index = src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + e_label;
if (dual_csr_list_[index] != NULL) {
dual_csr_list_[index]->Resize(degree_list[src_label],
degree_list[dst_label]);
delete dual_csr_list_[index];
}
}
}
}
}
void MutablePropertyFragment::loadSchema(const std::string& schema_path) {
auto io_adaptor = std::unique_ptr<grape::LocalIOAdaptor>(
new grape::LocalIOAdaptor(schema_path));
io_adaptor->Open();
schema_.Deserialize(io_adaptor);
}
void MutablePropertyFragment::Clear() {
for (auto ptr : dual_csr_list_) {
if (ptr != NULL) {
delete ptr;
}
}
lf_indexers_.clear();
vertex_data_.clear();
ie_.clear();
oe_.clear();
dual_csr_list_.clear();
vertex_label_num_ = 0;
edge_label_num_ = 0;
schema_.Clear();
}
void MutablePropertyFragment::DumpSchema(const std::string& schema_path) {
auto io_adaptor = std::unique_ptr<grape::LocalIOAdaptor>(
new grape::LocalIOAdaptor(schema_path));
io_adaptor->Open("wb");
schema_.Serialize(io_adaptor);
io_adaptor->Close();
}
inline DualCsrBase* create_csr(EdgeStrategy oes, EdgeStrategy ies,
const std::vector<PropertyType>& properties,
bool oe_mutable, bool ie_mutable,
const std::vector<std::string>& prop_names) {
if (properties.empty()) {
return new DualCsr<grape::EmptyType>(oes, ies, oe_mutable, ie_mutable);
} else if (properties.size() == 1) {
if (properties[0] == PropertyType::kBool) {
return new DualCsr<bool>(oes, ies, oe_mutable, ie_mutable);
} else if (properties[0] == PropertyType::kInt32) {
return new DualCsr<int32_t>(oes, ies, oe_mutable, ie_mutable);
} else if (properties[0] == PropertyType::kUInt32) {
return new DualCsr<uint32_t>(oes, ies, oe_mutable, ie_mutable);
} else if (properties[0] == PropertyType::kDate) {
return new DualCsr<Date>(oes, ies, oe_mutable, ie_mutable);
} else if (properties[0] == PropertyType::kInt64) {
return new DualCsr<int64_t>(oes, ies, oe_mutable, ie_mutable);
} else if (properties[0] == PropertyType::kUInt64) {
return new DualCsr<uint64_t>(oes, ies, oe_mutable, ie_mutable);
} else if (properties[0] == PropertyType::kDouble) {
return new DualCsr<double>(oes, ies, oe_mutable, ie_mutable);
} else if (properties[0] == PropertyType::kFloat) {
return new DualCsr<float>(oes, ies, oe_mutable, ie_mutable);
} else if (properties[0].type_enum == impl::PropertyTypeImpl::kVarChar) {
return new DualCsr<std::string_view>(
oes, ies, properties[0].additional_type_info.max_length, oe_mutable,
ie_mutable);
} else if (properties[0] == PropertyType::kStringView) {
return new DualCsr<std::string_view>(
oes, ies, gs::PropertyType::GetStringDefaultMaxLength(), oe_mutable,
ie_mutable);
}
} else {
// TODO: fix me, storage strategy not set
return new DualCsr<RecordView>(oes, ies, prop_names, properties, {},
oe_mutable, ie_mutable);
}
LOG(FATAL) << "not support edge strategy or edge data type";
return nullptr;
}
void MutablePropertyFragment::Open(const std::string& work_dir,
int memory_level) {
std::string schema_file = schema_path(work_dir);
std::string snapshot_dir{};
bool build_empty_graph = false;
if (std::filesystem::exists(schema_file)) {
loadSchema(schema_file);
vertex_label_num_ = schema_.vertex_label_num();
edge_label_num_ = schema_.edge_label_num();
lf_indexers_.resize(vertex_label_num_);
snapshot_dir = get_latest_snapshot(work_dir);
} else {
vertex_label_num_ = schema_.vertex_label_num();
edge_label_num_ = schema_.edge_label_num();
lf_indexers_.resize(vertex_label_num_);
build_empty_graph = true;
for (size_t i = 0; i < vertex_label_num_; ++i) {
lf_indexers_[i].init(std::get<0>(schema_.get_vertex_primary_key(i)[0]));
}
}
vertex_data_.resize(vertex_label_num_);
std::string tmp_dir_path = tmp_dir(work_dir);
if (std::filesystem::exists(tmp_dir_path)) {
std::filesystem::remove_all(tmp_dir_path);
}
std::filesystem::create_directories(tmp_dir_path);
std::vector<size_t> vertex_capacities(vertex_label_num_, 0);
for (size_t i = 0; i < vertex_label_num_; ++i) {
std::string v_label_name = schema_.get_vertex_label_name(i);
if (memory_level == 0) {
lf_indexers_[i].open(
IndexerType::prefix() + "_" + vertex_map_prefix(v_label_name),
snapshot_dir, tmp_dir_path);
vertex_data_[i].open(vertex_table_prefix(v_label_name), snapshot_dir,
tmp_dir_path, schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
if (!build_empty_graph) {
vertex_data_[i].copy_to_tmp(vertex_table_prefix(v_label_name),
snapshot_dir, tmp_dir_path);
}
} else if (memory_level == 1) {
lf_indexers_[i].open_in_memory(snapshot_dir + "/" +
IndexerType::prefix() + "_" +
vertex_map_prefix(v_label_name));
vertex_data_[i].open_in_memory(
vertex_table_prefix(v_label_name), snapshot_dir,
schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name));
} else if (memory_level == 2) {
lf_indexers_[i].open_with_hugepages(snapshot_dir + "/" +
IndexerType::prefix() + "_" +
vertex_map_prefix(v_label_name),
false);
vertex_data_[i].open_with_hugepages(
vertex_table_prefix(v_label_name), snapshot_dir,
schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name), false);
} else {
assert(memory_level == 3);
lf_indexers_[i].open_with_hugepages(snapshot_dir + "/" +
IndexerType::prefix() + "_" +
vertex_map_prefix(v_label_name),
true);
vertex_data_[i].open_with_hugepages(
vertex_table_prefix(v_label_name), snapshot_dir,
schema_.get_vertex_property_names(i),
schema_.get_vertex_properties(i),
schema_.get_vertex_storage_strategies(v_label_name), true);
}
// We will reserve the at least 4096 slots for each vertex label
size_t vertex_capacity =
std::max(lf_indexers_[i].capacity(), (size_t) 4096);
if (vertex_capacity > lf_indexers_[i].capacity()) {
lf_indexers_[i].reserve(vertex_capacity);
}
vertex_data_[i].resize(vertex_capacity);
vertex_capacities[i] = vertex_capacity;
}
ie_.resize(vertex_label_num_ * vertex_label_num_ * edge_label_num_, NULL);
oe_.resize(vertex_label_num_ * vertex_label_num_ * edge_label_num_, NULL);
dual_csr_list_.resize(vertex_label_num_ * vertex_label_num_ * edge_label_num_,
NULL);
for (size_t src_label_i = 0; src_label_i != vertex_label_num_;
++src_label_i) {
std::string src_label =
schema_.get_vertex_label_name(static_cast<label_t>(src_label_i));
for (size_t dst_label_i = 0; dst_label_i != vertex_label_num_;
++dst_label_i) {
std::string dst_label =
schema_.get_vertex_label_name(static_cast<label_t>(dst_label_i));
for (size_t e_label_i = 0; e_label_i != edge_label_num_; ++e_label_i) {
std::string edge_label =
schema_.get_edge_label_name(static_cast<label_t>(e_label_i));
if (!schema_.exist(src_label, dst_label, edge_label)) {
continue;
}
size_t index = src_label_i * vertex_label_num_ * edge_label_num_ +
dst_label_i * edge_label_num_ + e_label_i;
auto& properties =
schema_.get_edge_properties(src_label, dst_label, edge_label);
EdgeStrategy oe_strategy = schema_.get_outgoing_edge_strategy(
src_label, dst_label, edge_label);
EdgeStrategy ie_strategy = schema_.get_incoming_edge_strategy(
src_label, dst_label, edge_label);
bool oe_mutable =
schema_.outgoing_edge_mutable(src_label, dst_label, edge_label);
bool ie_mutable =
schema_.incoming_edge_mutable(src_label, dst_label, edge_label);
auto& prop_names =
schema_.get_edge_property_names(src_label, dst_label, edge_label);
dual_csr_list_[index] = create_csr(oe_strategy, ie_strategy, properties,
oe_mutable, ie_mutable, prop_names);
ie_[index] = dual_csr_list_[index]->GetInCsr();
oe_[index] = dual_csr_list_[index]->GetOutCsr();
if (memory_level == 0) {
dual_csr_list_[index]->Open(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label), snapshot_dir,
tmp_dir_path);
} else if (memory_level >= 2) {
dual_csr_list_[index]->OpenWithHugepages(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label), snapshot_dir,
vertex_capacities[src_label_i], vertex_capacities[dst_label_i]);
} else {
dual_csr_list_[index]->OpenInMemory(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label), snapshot_dir,
vertex_capacities[src_label_i], vertex_capacities[dst_label_i]);
}
dual_csr_list_[index]->Resize(vertex_capacities[src_label_i],
vertex_capacities[dst_label_i]);
}
}
}
}
void MutablePropertyFragment::Compact(uint32_t version) {
for (size_t src_label_i = 0; src_label_i != vertex_label_num_;
++src_label_i) {
std::string src_label =
schema_.get_vertex_label_name(static_cast<label_t>(src_label_i));
for (size_t dst_label_i = 0; dst_label_i != vertex_label_num_;
++dst_label_i) {
std::string dst_label =
schema_.get_vertex_label_name(static_cast<label_t>(dst_label_i));
for (size_t e_label_i = 0; e_label_i != edge_label_num_; ++e_label_i) {
std::string edge_label =
schema_.get_edge_label_name(static_cast<label_t>(e_label_i));
if (!schema_.exist(src_label, dst_label, edge_label)) {
continue;
}
size_t index = src_label_i * vertex_label_num_ * edge_label_num_ +
dst_label_i * edge_label_num_ + e_label_i;
if (dual_csr_list_[index] != NULL) {
if (schema_.get_sort_on_compaction(src_label, dst_label,
edge_label)) {
dual_csr_list_[index]->SortByEdgeData(version);
}
}
}
}
}
}
void MutablePropertyFragment::Dump(const std::string& work_dir,
uint32_t version) {
std::string snapshot_dir_path = snapshot_dir(work_dir, version);
std::error_code errorCode;
std::filesystem::create_directories(snapshot_dir_path, errorCode);
if (errorCode) {
std::stringstream ss;
ss << "Failed to create snapshot directory: " << snapshot_dir_path << ", "
<< errorCode.message();
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
std::vector<size_t> vertex_num(vertex_label_num_, 0);
for (size_t i = 0; i < vertex_label_num_; ++i) {
vertex_num[i] = lf_indexers_[i].size();
lf_indexers_[i].dump(
IndexerType::prefix() + "_" +
vertex_map_prefix(schema_.get_vertex_label_name(i)),
snapshot_dir_path);
vertex_data_[i].resize(vertex_num[i]);
vertex_data_[i].dump(vertex_table_prefix(schema_.get_vertex_label_name(i)),
snapshot_dir_path);
}
for (size_t src_label_i = 0; src_label_i != vertex_label_num_;
++src_label_i) {
std::string src_label =
schema_.get_vertex_label_name(static_cast<label_t>(src_label_i));
for (size_t dst_label_i = 0; dst_label_i != vertex_label_num_;
++dst_label_i) {
std::string dst_label =
schema_.get_vertex_label_name(static_cast<label_t>(dst_label_i));
for (size_t e_label_i = 0; e_label_i != edge_label_num_; ++e_label_i) {
std::string edge_label =
schema_.get_edge_label_name(static_cast<label_t>(e_label_i));
if (!schema_.exist(src_label, dst_label, edge_label)) {
continue;
}
size_t index = src_label_i * vertex_label_num_ * edge_label_num_ +
dst_label_i * edge_label_num_ + e_label_i;
if (dual_csr_list_[index] != NULL) {
dual_csr_list_[index]->Resize(vertex_num[src_label_i],
vertex_num[dst_label_i]);
if (schema_.get_sort_on_compaction(src_label, dst_label,
edge_label)) {
dual_csr_list_[index]->SortByEdgeData(version + 1);
}
dual_csr_list_[index]->Dump(
oe_prefix(src_label, dst_label, edge_label),
ie_prefix(src_label, dst_label, edge_label),
edata_prefix(src_label, dst_label, edge_label),
snapshot_dir_path);
}
}
}
}
set_snapshot_version(work_dir, version);
}
void MutablePropertyFragment::Warmup(int thread_num) {
double t = -grape::GetCurrentTime();
for (auto ptr : dual_csr_list_) {
if (ptr != NULL) {
ptr->Warmup(thread_num);
}
}
for (auto& indexer : lf_indexers_) {
indexer.warmup(thread_num);
}
t += grape::GetCurrentTime();
LOG(INFO) << "Warmup takes: " << t << " s";
}
void MutablePropertyFragment::IngestEdge(label_t src_label, vid_t src_lid,
label_t dst_label, vid_t dst_lid,
label_t edge_label, timestamp_t ts,
grape::OutArchive& arc,
Allocator& alloc) {
size_t index = src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label;
dual_csr_list_[index]->IngestEdge(src_lid, dst_lid, arc, ts, alloc);
}
void MutablePropertyFragment::UpdateEdge(label_t src_label, vid_t src_lid,
label_t dst_label, vid_t dst_lid,
label_t edge_label, timestamp_t ts,
const Any& arc, Allocator& alloc) {
size_t index = src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label;
dual_csr_list_[index]->UpdateEdge(src_lid, dst_lid, arc, ts, alloc);
}
const Schema& MutablePropertyFragment::schema() const { return schema_; }
Schema& MutablePropertyFragment::mutable_schema() { return schema_; }
vid_t MutablePropertyFragment::vertex_num(label_t vertex_label) const {
return static_cast<vid_t>(lf_indexers_[vertex_label].size());
}
size_t MutablePropertyFragment::edge_num(label_t src_label, label_t edge_label,
label_t dst_label) const {
size_t index = src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label;
if (dual_csr_list_[index] != NULL) {
return dual_csr_list_[index]->EdgeNum();
} else {
return 0;
}
}
bool MutablePropertyFragment::get_lid(label_t label, const Any& oid,
vid_t& lid) const {
return lf_indexers_[label].get_index(oid, lid);
}
Any MutablePropertyFragment::get_oid(label_t label, vid_t lid) const {
return lf_indexers_[label].get_key(lid);
}
vid_t MutablePropertyFragment::add_vertex(label_t label, const Any& id) {
return lf_indexers_[label].insert(id);
}
std::shared_ptr<CsrConstEdgeIterBase>
MutablePropertyFragment::get_outgoing_edges(label_t label, vid_t u,
label_t neighbor_label,
label_t edge_label) const {
return get_oe_csr(label, neighbor_label, edge_label)->edge_iter(u);
}
std::shared_ptr<CsrConstEdgeIterBase>
MutablePropertyFragment::get_incoming_edges(label_t label, vid_t u,
label_t neighbor_label,
label_t edge_label) const {
return get_ie_csr(label, neighbor_label, edge_label)->edge_iter(u);
}
CsrConstEdgeIterBase* MutablePropertyFragment::get_outgoing_edges_raw(
label_t label, vid_t u, label_t neighbor_label, label_t edge_label) const {
return get_oe_csr(label, neighbor_label, edge_label)->edge_iter_raw(u);
}
CsrConstEdgeIterBase* MutablePropertyFragment::get_incoming_edges_raw(
label_t label, vid_t u, label_t neighbor_label, label_t edge_label) const {
return get_ie_csr(label, neighbor_label, edge_label)->edge_iter_raw(u);
}
std::shared_ptr<CsrEdgeIterBase>
MutablePropertyFragment::get_outgoing_edges_mut(label_t label, vid_t u,
label_t neighbor_label,
label_t edge_label) {
return get_oe_csr(label, neighbor_label, edge_label)->edge_iter_mut(u);
}
std::shared_ptr<CsrEdgeIterBase>
MutablePropertyFragment::get_incoming_edges_mut(label_t label, vid_t u,
label_t neighbor_label,
label_t edge_label) {
return get_ie_csr(label, neighbor_label, edge_label)->edge_iter_mut(u);
}
void MutablePropertyFragment::generateStatistics(
const std::string& work_dir) const {
std::string filename = work_dir + "/statistics.json";
size_t vertex_count = 0;
std::string ss = "\"vertex_type_statistics\": [\n";
size_t vertex_label_num = schema_.vertex_label_num();
for (size_t idx = 0; idx < vertex_label_num; ++idx) {
ss += "{\n\"type_id\": " + std::to_string(idx) + ", \n";
ss += "\"type_name\": \"" + schema_.get_vertex_label_name(idx) + "\", \n";
size_t count = lf_indexers_[idx].size();
ss += "\"count\": " + std::to_string(count) + "\n}";
vertex_count += count;
if (idx != vertex_label_num - 1) {
ss += ", \n";
} else {
ss += "\n";
}
}
ss += "]\n";
size_t edge_count = 0;
size_t edge_label_num = schema_.edge_label_num();
std::vector<std::thread> count_threads;
std::vector<size_t> edge_count_list(dual_csr_list_.size(), 0);
for (size_t src_label = 0; src_label < vertex_label_num; ++src_label) {
const auto& src_label_name = schema_.get_vertex_label_name(src_label);
for (size_t dst_label = 0; dst_label < vertex_label_num; ++dst_label) {
const auto& dst_label_name = schema_.get_vertex_label_name(dst_label);
for (size_t edge_label = 0; edge_label < edge_label_num; ++edge_label) {
const auto& edge_label_name = schema_.get_edge_label_name(edge_label);
if (schema_.exist(src_label_name, dst_label_name, edge_label_name)) {
size_t index = src_label * vertex_label_num * edge_label_num +
dst_label * edge_label_num + edge_label;
if (dual_csr_list_[index] != NULL) {
count_threads.emplace_back([&edge_count_list, index, this] {
edge_count_list[index] = dual_csr_list_[index]->EdgeNum();
});
}
}
}
}
}
for (auto& t : count_threads) {
t.join();
}
ss += ",\n";
ss += "\"edge_type_statistics\": [";
for (size_t edge_label = 0; edge_label < edge_label_num; ++edge_label) {
const auto& edge_label_name = schema_.get_edge_label_name(edge_label);
ss += "{\n\"type_id\": " + std::to_string(edge_label) + ", \n";
ss += "\"type_name\": \"" + edge_label_name + "\", \n";
ss += "\"vertex_type_pair_statistics\": [\n";
bool first = true;
std::string props_content{};
for (size_t src_label = 0; src_label < vertex_label_num; ++src_label) {
const auto& src_label_name = schema_.get_vertex_label_name(src_label);
for (size_t dst_label = 0; dst_label < vertex_label_num; ++dst_label) {
const auto& dst_label_name = schema_.get_vertex_label_name(dst_label);
size_t index = src_label * vertex_label_num * edge_label_num +
dst_label * edge_label_num + edge_label;
if (schema_.exist(src_label_name, dst_label_name, edge_label_name)) {
if (!first) {
ss += ",\n";
}
first = false;
ss += "{\n\"source_vertex\" : \"" + src_label_name + "\", \n";
ss += "\"destination_vertex\" : \"" + dst_label_name + "\", \n";
ss += "\"count\" : " + std::to_string(edge_count_list[index]) + "\n";
edge_count += edge_count_list[index];
ss += "}";
}
}
}
ss += "\n]\n}";
if (edge_label != edge_label_num - 1) {
ss += ", \n";
} else {
ss += "\n";
}
}
ss += "]\n";
{
std::ofstream out(filename);
out << "{\n\"total_vertex_count\": " << vertex_count << ",\n";
out << "\"total_edge_count\": " << edge_count << ",\n";
out << ss;
out << "}\n";
out.close();
}
}
} // namespace gs