flex/storages/rt_mutable_graph/csr/mutable_csr.h (1,106 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef STORAGES_RT_MUTABLE_GRAPH_CSR_MUTABLE_CSR_H_
#define STORAGES_RT_MUTABLE_GRAPH_CSR_MUTABLE_CSR_H_
#include <thread>
#include "grape/utils/concurrent_queue.h"
#include "flex/storages/rt_mutable_graph/csr/adj_list.h"
#include "flex/storages/rt_mutable_graph/csr/csr_base.h"
#include "flex/storages/rt_mutable_graph/csr/nbr.h"
namespace gs {
void read_file(const std::string& filename, void* buffer, size_t size,
size_t num);
void write_file(const std::string& filename, const void* buffer, size_t size,
size_t num);
template <typename EDATA_T>
class MutableCsrConstEdgeIter : public CsrConstEdgeIterBase {
using const_nbr_ptr_t = typename MutableNbrSlice<EDATA_T>::const_nbr_ptr_t;
public:
explicit MutableCsrConstEdgeIter(const MutableNbrSlice<EDATA_T>& slice)
: cur_(slice.begin()), end_(slice.end()) {}
~MutableCsrConstEdgeIter() = default;
vid_t get_neighbor() const override { return (*cur_).get_neighbor(); }
Any get_data() const override {
return AnyConverter<EDATA_T>::to_any((*cur_).get_data());
}
timestamp_t get_timestamp() const override { return (*cur_).get_timestamp(); }
void next() override { ++cur_; }
CsrConstEdgeIterBase& operator+=(size_t offset) override {
cur_ += offset;
if (!(cur_ < end_)) {
cur_ = end_;
}
return *this;
}
bool is_valid() const override { return cur_ != end_; }
size_t size() const override { return end_ - cur_; }
private:
const_nbr_ptr_t cur_;
const_nbr_ptr_t end_;
};
template <typename EDATA_T>
class MutableCsrEdgeIter : public CsrEdgeIterBase {
using nbr_t = MutableNbr<EDATA_T>;
public:
explicit MutableCsrEdgeIter(MutableNbrSliceMut<EDATA_T> slice)
: cur_(slice.begin()), end_(slice.end()) {}
~MutableCsrEdgeIter() = default;
vid_t get_neighbor() const override { return cur_->neighbor; }
Any get_data() const override {
return AnyConverter<EDATA_T>::to_any(cur_->data);
}
timestamp_t get_timestamp() const override { return cur_->timestamp.load(); }
void set_data(const Any& value, timestamp_t ts) override {
ConvertAny<EDATA_T>::to(value, cur_->data);
cur_->timestamp.store(ts);
}
CsrEdgeIterBase& operator+=(size_t offset) override {
if (cur_ + offset >= end_) {
cur_ = end_;
} else {
cur_ += offset;
}
return *this;
}
void next() override { ++cur_; }
bool is_valid() const override { return cur_ != end_; }
size_t size() const override { return end_ - cur_; }
private:
nbr_t* cur_;
nbr_t* end_;
};
template <>
class MutableCsrEdgeIter<std::string_view> : public CsrEdgeIterBase {
using nbr_ptr_t = typename MutableNbrSliceMut<std::string_view>::nbr_ptr_t;
public:
explicit MutableCsrEdgeIter(MutableNbrSliceMut<std::string_view> slice)
: cur_(slice.begin()), end_(slice.end()) {}
~MutableCsrEdgeIter() = default;
vid_t get_neighbor() const override { return cur_.get_neighbor(); }
Any get_data() const override {
return AnyConverter<std::string_view>::to_any(cur_.get_data());
}
timestamp_t get_timestamp() const override { return cur_.get_timestamp(); }
void set_data(const Any& value, timestamp_t ts) override {
cur_.set_data(value.AsStringView(), ts);
}
size_t get_index() const { return cur_.get_index(); }
void set_timestamp(timestamp_t ts) { cur_.set_timestamp(ts); }
CsrEdgeIterBase& operator+=(size_t offset) override {
cur_ += offset;
if (!(cur_ < end_)) {
cur_ = end_;
}
return *this;
}
void next() override { ++cur_; }
bool is_valid() const override { return cur_ != end_; }
size_t size() const override { return end_.ptr_ - cur_.ptr_; }
private:
nbr_ptr_t cur_;
nbr_ptr_t end_;
};
template <>
class MutableCsrEdgeIter<RecordView> : public CsrEdgeIterBase {
using nbr_ptr_t = typename MutableNbrSliceMut<RecordView>::nbr_ptr_t;
public:
explicit MutableCsrEdgeIter(MutableNbrSliceMut<RecordView> slice)
: cur_(slice.begin()), end_(slice.end()) {}
~MutableCsrEdgeIter() = default;
vid_t get_neighbor() const override { return cur_->get_neighbor(); }
Any get_data() const override {
return AnyConverter<RecordView>::to_any(cur_->get_data());
}
size_t get_index() const { return cur_.get_index(); }
timestamp_t get_timestamp() const override { return cur_->get_timestamp(); }
void set_timestamp(timestamp_t ts) { cur_->set_timestamp(ts); }
void set_data(const Any& value, timestamp_t ts) override {
Record rv = value.AsRecord();
cur_->set_data(rv, ts);
}
CsrEdgeIterBase& operator+=(size_t offset) override {
cur_ += offset;
if (!(cur_ < end_)) {
cur_ = end_;
}
return *this;
}
void next() override { ++cur_; }
bool is_valid() const override { return cur_ != end_; }
size_t size() const override { return end_ - cur_; }
private:
nbr_ptr_t cur_;
nbr_ptr_t end_;
};
template <typename EDATA_T>
class MutableCsr : public TypedMutableCsrBase<EDATA_T> {
public:
using nbr_t = MutableNbr<EDATA_T>;
using adjlist_t = MutableAdjlist<EDATA_T>;
using slice_t = MutableNbrSlice<EDATA_T>;
using mut_slice_t = MutableNbrSliceMut<EDATA_T>;
MutableCsr() : locks_(nullptr) {}
~MutableCsr() {
if (locks_ != nullptr) {
delete[] locks_;
}
}
size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree,
double reserve_ratio) override {
reserve_ratio = std::max(reserve_ratio, 1.0);
size_t vnum = degree.size();
adj_lists_.open(work_dir + "/" + name + ".adj", true);
adj_lists_.resize(vnum);
locks_ = new grape::SpinLock[vnum];
size_t edge_num = 0;
for (auto d : degree) {
edge_num += (std::ceil(d * reserve_ratio));
}
nbr_list_.open(work_dir + "/" + name + ".nbr", true);
nbr_list_.resize(edge_num);
nbr_t* ptr = nbr_list_.data();
for (vid_t i = 0; i < vnum; ++i) {
int deg = degree[i];
int cap = std::ceil(deg * reserve_ratio);
adj_lists_[i].init(ptr, cap, 0);
ptr += cap;
}
unsorted_since_ = 0;
return edge_num;
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
reserve_ratio = std::max(reserve_ratio, 1.0);
size_t vnum = degree.size();
adj_lists_.open("", false);
adj_lists_.resize(vnum);
locks_ = new grape::SpinLock[vnum];
size_t edge_num = 0;
for (auto d : degree) {
edge_num += (std::ceil(d * reserve_ratio));
}
nbr_list_.open("", false);
nbr_list_.resize(edge_num);
nbr_t* ptr = nbr_list_.data();
for (vid_t i = 0; i < vnum; ++i) {
int deg = degree[i];
int cap = std::ceil(deg * reserve_ratio);
adj_lists_[i].init(ptr, cap, 0);
ptr += cap;
}
unsorted_since_ = 0;
return edge_num;
}
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T& data,
timestamp_t ts) override {
adj_lists_[src].batch_put_edge(dst, data, ts);
}
void batch_sort_by_edge_data(timestamp_t ts) override {
size_t vnum = adj_lists_.size();
for (size_t i = 0; i != vnum; ++i) {
std::sort(adj_lists_[i].data(),
adj_lists_[i].data() + adj_lists_[i].size(),
[](const nbr_t& lhs, const nbr_t& rhs) {
return lhs.data < rhs.data;
});
}
unsorted_since_ = ts;
}
timestamp_t unsorted_since() const override { return unsorted_since_; }
void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {
mmap_array<int> degree_list;
mmap_array<int>* cap_list = °ree_list;
if (snapshot_dir != "") {
degree_list.open(snapshot_dir + "/" + name + ".deg", false);
if (std::filesystem::exists(snapshot_dir + "/" + name + ".cap")) {
cap_list = new mmap_array<int>();
cap_list->open(snapshot_dir + "/" + name + ".cap", false);
}
nbr_list_.open(snapshot_dir + "/" + name + ".nbr", false);
load_meta(snapshot_dir + "/" + name);
}
nbr_list_.touch(work_dir + "/" + name + ".nbr");
adj_lists_.open(work_dir + "/" + name + ".adj", true);
adj_lists_.resize(degree_list.size());
locks_ = new grape::SpinLock[degree_list.size()];
nbr_t* ptr = nbr_list_.data();
for (size_t i = 0; i < degree_list.size(); ++i) {
int degree = degree_list[i];
int cap = (*cap_list)[i];
adj_lists_[i].init(ptr, cap, degree);
ptr += cap;
}
if (cap_list != °ree_list) {
delete cap_list;
}
}
void open_in_memory(const std::string& prefix, size_t v_cap) override {
mmap_array<int> degree_list;
degree_list.open(prefix + ".deg", false);
load_meta(prefix);
mmap_array<int>* cap_list = °ree_list;
if (std::filesystem::exists(prefix + ".cap")) {
cap_list = new mmap_array<int>();
cap_list->open(prefix + ".cap", false);
}
nbr_list_.open(prefix + ".nbr", false);
adj_lists_.reset();
v_cap = std::max(v_cap, degree_list.size());
adj_lists_.resize(v_cap);
locks_ = new grape::SpinLock[v_cap];
nbr_t* ptr = nbr_list_.data();
for (size_t i = 0; i < degree_list.size(); ++i) {
int degree = degree_list[i];
int cap = (*cap_list)[i];
adj_lists_[i].init(ptr, cap, degree);
ptr += cap;
}
for (size_t i = degree_list.size(); i < v_cap; ++i) {
adj_lists_[i].init(ptr, 0, 0);
}
if (cap_list != °ree_list) {
delete cap_list;
}
}
void open_with_hugepages(const std::string& prefix, size_t v_cap) override {
mmap_array<int> degree_list;
degree_list.open(prefix + ".deg", false);
load_meta(prefix);
mmap_array<int>* cap_list = °ree_list;
if (std::filesystem::exists(prefix + ".cap")) {
cap_list = new mmap_array<int>();
cap_list->open(prefix + ".cap", false);
}
nbr_list_.open_with_hugepages(prefix + ".nbr");
adj_lists_.reset();
v_cap = std::max(v_cap, degree_list.size());
adj_lists_.open_with_hugepages("");
adj_lists_.resize(v_cap);
locks_ = new grape::SpinLock[v_cap];
nbr_t* ptr = nbr_list_.data();
for (size_t i = 0; i < degree_list.size(); ++i) {
int degree = degree_list[i];
int cap = (*cap_list)[i];
adj_lists_[i].init(ptr, cap, degree);
ptr += cap;
}
for (size_t i = degree_list.size(); i < v_cap; ++i) {
adj_lists_[i].init(ptr, 0, 0);
}
if (cap_list != °ree_list) {
delete cap_list;
}
}
void dump(const std::string& name,
const std::string& new_snapshot_dir) override {
size_t vnum = adj_lists_.size();
bool reuse_nbr_list = true;
dump_meta(new_snapshot_dir + "/" + name);
mmap_array<int> degree_list;
std::vector<int> cap_list;
degree_list.open(new_snapshot_dir + "/" + name + ".deg", true);
degree_list.resize(vnum);
cap_list.resize(vnum);
bool need_cap_list = false;
size_t offset = 0;
for (size_t i = 0; i < vnum; ++i) {
if (adj_lists_[i].size() != 0) {
if (!(adj_lists_[i].data() == nbr_list_.data() + offset &&
offset < nbr_list_.size())) {
reuse_nbr_list = false;
}
}
offset += adj_lists_[i].capacity();
degree_list[i] = adj_lists_[i].size();
cap_list[i] = adj_lists_[i].capacity();
if (degree_list[i] != cap_list[i]) {
need_cap_list = true;
}
}
if (need_cap_list) {
write_file(new_snapshot_dir + "/" + name + ".cap", cap_list.data(),
sizeof(int), cap_list.size());
}
if (reuse_nbr_list && !nbr_list_.filename().empty() &&
std::filesystem::exists(nbr_list_.filename())) {
std::error_code errorCode;
std::filesystem::create_hard_link(nbr_list_.filename(),
new_snapshot_dir + "/" + name + ".nbr",
errorCode);
if (errorCode) {
std::stringstream ss;
ss << "Failed to create hard link from " << nbr_list_.filename()
<< " to " << new_snapshot_dir + "/" + name + ".snbr"
<< ", error code: " << errorCode << " " << errorCode.message();
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
} else {
FILE* fout =
fopen((new_snapshot_dir + "/" + name + ".nbr").c_str(), "wb");
std::string filename = new_snapshot_dir + "/" + name + ".nbr";
if (fout == nullptr) {
std::stringstream ss;
ss << "Failed to open nbr list " << filename << ", " << strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
for (size_t i = 0; i < vnum; ++i) {
size_t ret{};
if ((ret = fwrite(adj_lists_[i].data(), sizeof(nbr_t),
adj_lists_[i].capacity(), fout)) !=
static_cast<size_t>(adj_lists_[i].capacity())) {
std::stringstream ss;
ss << "Failed to write nbr list " << filename << ", expected "
<< adj_lists_[i].capacity() << ", got " << ret << ", "
<< strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
}
int ret = 0;
if ((ret = fflush(fout)) != 0) {
std::stringstream ss;
ss << "Failed to flush nbr list " << filename << ", error code: " << ret
<< " " << strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
if ((ret = fclose(fout)) != 0) {
std::stringstream ss;
ss << "Failed to close nbr list " << filename << ", error code: " << ret
<< " " << strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
}
}
void warmup(int thread_num) const override {
size_t vnum = adj_lists_.size();
std::vector<std::thread> threads;
std::atomic<size_t> v_i(0);
const size_t chunk = 4096;
std::atomic<size_t> output(0);
for (int i = 0; i < thread_num; ++i) {
threads.emplace_back([&]() {
size_t ret = 0;
while (true) {
size_t begin = std::min(v_i.fetch_add(chunk), vnum);
size_t end = std::min(begin + chunk, vnum);
if (begin == end) {
break;
}
while (begin < end) {
auto adj_list = get_edges(begin);
for (auto& nbr : adj_list) {
ret += nbr.neighbor;
}
++begin;
}
}
output.fetch_add(ret);
});
}
for (auto& thrd : threads) {
thrd.join();
}
(void) output.load();
}
void resize(vid_t vnum) override {
if (vnum > adj_lists_.size()) {
size_t old_size = adj_lists_.size();
adj_lists_.resize(vnum);
for (size_t k = old_size; k != vnum; ++k) {
adj_lists_[k].init(NULL, 0, 0);
}
delete[] locks_;
locks_ = new grape::SpinLock[vnum];
} else {
adj_lists_.resize(vnum);
}
}
size_t size() const override { return adj_lists_.size(); }
size_t edge_num() const override {
size_t res = 0;
for (size_t i = 0; i < adj_lists_.size(); ++i) {
res += adj_lists_[i].size();
}
return res;
}
std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override {
return std::make_shared<MutableCsrConstEdgeIter<EDATA_T>>(get_edges(v));
}
CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override {
return new MutableCsrConstEdgeIter<EDATA_T>(get_edges(v));
}
std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override {
return std::make_shared<MutableCsrEdgeIter<EDATA_T>>(get_edges_mut(v));
}
void put_edge(vid_t src, vid_t dst, const EDATA_T& data, timestamp_t ts,
Allocator& alloc) override {
CHECK_LT(src, adj_lists_.size());
locks_[src].lock();
adj_lists_[src].put_edge(dst, data, ts, alloc);
locks_[src].unlock();
}
inline slice_t get_edges(vid_t v) const override {
return adj_lists_[v].get_edges();
}
inline mut_slice_t get_edges_mut(vid_t i) {
return adj_lists_[i].get_edges_mut();
}
void close() override {
if (locks_ != nullptr) {
delete[] locks_;
locks_ = nullptr;
}
adj_lists_.reset();
nbr_list_.reset();
}
private:
void load_meta(const std::string& prefix) {
std::string meta_file_path = prefix + ".meta";
if (std::filesystem::exists(meta_file_path)) {
read_file(meta_file_path, &unsorted_since_, sizeof(timestamp_t), 1);
} else {
unsorted_since_ = 0;
}
}
void dump_meta(const std::string& prefix) const {
std::string meta_file_path = prefix + ".meta";
write_file(meta_file_path, &unsorted_since_, sizeof(timestamp_t), 1);
}
grape::SpinLock* locks_;
mmap_array<adjlist_t> adj_lists_;
mmap_array<nbr_t> nbr_list_;
timestamp_t unsorted_since_;
};
template <>
class MutableCsr<std::string_view>
: public TypedMutableCsrBase<std::string_view> {
public:
using nbr_t = MutableNbr<size_t>;
using adjlist_t = MutableAdjlist<size_t>;
using slice_t = MutableNbrSlice<std::string_view>;
using mut_slice_t = MutableNbrSliceMut<std::string_view>;
MutableCsr(StringColumn& column) : column_(column), csr_() {}
~MutableCsr() {}
size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree,
double reserve_ratio) override {
return csr_.batch_init(name, work_dir, degree, reserve_ratio);
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve) override {
return csr_.batch_init_in_memory(degree, reserve);
}
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data,
timestamp_t ts) override {
csr_.batch_put_edge(src, dst, data, ts);
}
void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {
csr_.open(name, snapshot_dir, work_dir);
}
void open_in_memory(const std::string& prefix, size_t v_cap) override {
csr_.open_in_memory(prefix, v_cap);
}
void dump(const std::string& name,
const std::string& new_snapshot_dir) override {
csr_.dump(name, new_snapshot_dir);
}
void warmup(int thread_num) const override { csr_.warmup(thread_num); }
void resize(vid_t vnum) override { csr_.resize(vnum); }
size_t size() const override { return csr_.size(); }
size_t edge_num() const override { return csr_.edge_num(); }
std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override {
return std::make_shared<MutableCsrConstEdgeIter<std::string_view>>(
get_edges(v));
}
CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override {
return new MutableCsrConstEdgeIter<std::string_view>(get_edges(v));
}
std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override {
return std::make_shared<MutableCsrEdgeIter<std::string_view>>(
get_edges_mut(v));
}
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts,
Allocator& alloc) {
csr_.put_edge(src, dst, data, ts, alloc);
}
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts,
Allocator& alloc) override {
csr_.put_edge(src, dst, index, ts, alloc);
}
inline slice_t get_edges(vid_t i) const override {
return slice_t(csr_.get_edges(i), column_);
}
inline mut_slice_t get_edges_mut(vid_t i) {
return mut_slice_t(csr_.get_edges_mut(i), column_);
}
void close() override { csr_.close(); }
private:
StringColumn& column_;
MutableCsr<size_t> csr_;
};
template <>
class MutableCsr<RecordView> : public TypedMutableCsrBase<RecordView> {
public:
using nbr_t = MutableNbr<size_t>;
using adjlist_t = MutableAdjlist<size_t>;
using slice_t = MutableNbrSlice<RecordView>;
using mut_slice_t = MutableNbrSliceMut<RecordView>;
MutableCsr(Table& table) : table_(table), csr_() {}
~MutableCsr() {}
size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree,
double reserve_ratio = 1.2) override {
return csr_.batch_init(name, work_dir, degree, reserve_ratio);
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio = 1.2) override {
return csr_.batch_init_in_memory(degree, reserve_ratio);
}
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data,
timestamp_t ts = 0) override {
csr_.batch_put_edge(src, dst, data, ts);
}
void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {
csr_.open(name, snapshot_dir, work_dir);
}
void open_in_memory(const std::string& prefix, size_t v_cap) override {
csr_.open_in_memory(prefix, v_cap);
}
void dump(const std::string& name,
const std::string& new_snapshot_dir) override {
csr_.dump(name, new_snapshot_dir);
}
void warmup(int thread_num) const override { csr_.warmup(thread_num); }
void resize(vid_t vnum) override { csr_.resize(vnum); }
size_t size() const override { return csr_.size(); }
size_t edge_num() const override { return csr_.edge_num(); }
std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override {
return std::make_shared<MutableCsrConstEdgeIter<RecordView>>(get_edges(v));
}
CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override {
return new MutableCsrConstEdgeIter<RecordView>(get_edges(v));
}
std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override {
return std::make_shared<MutableCsrEdgeIter<RecordView>>(get_edges_mut(v));
}
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts,
Allocator& alloc) {
csr_.put_edge(src, dst, data, ts, alloc);
}
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts,
Allocator& alloc) override {
csr_.put_edge(src, dst, index, ts, alloc);
}
inline slice_t get_edges(vid_t i) const override {
return slice_t(csr_.get_edges(i), table_);
}
inline mut_slice_t get_edges_mut(vid_t i) {
return mut_slice_t(csr_.get_edges_mut(i), table_);
}
void close() override { csr_.close(); }
private:
Table& table_;
MutableCsr<size_t> csr_;
};
template <typename EDATA_T>
class SingleMutableCsr : public TypedMutableCsrBase<EDATA_T> {
public:
using nbr_t = MutableNbr<EDATA_T>;
using slice_t = MutableNbrSlice<EDATA_T>;
using mut_slice_t = MutableNbrSliceMut<EDATA_T>;
SingleMutableCsr() {}
~SingleMutableCsr() {}
size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.open(work_dir + "/" + name + ".snbr", true);
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
}
return vnum;
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
size_t vnum = degree.size();
nbr_list_.open("", false);
nbr_list_.resize(vnum);
for (size_t k = 0; k != vnum; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
}
return vnum;
}
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T& data,
timestamp_t ts = 0) override {
nbr_list_[src].neighbor = dst;
nbr_list_[src].data = data;
CHECK_EQ(nbr_list_[src].timestamp.load(),
std::numeric_limits<timestamp_t>::max());
nbr_list_[src].timestamp.store(ts);
}
void batch_sort_by_edge_data(timestamp_t ts) override {}
timestamp_t unsorted_since() const override {
return std::numeric_limits<timestamp_t>::max();
}
void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {
if (!std::filesystem::exists(work_dir + "/" + name + ".snbr")) {
copy_file(snapshot_dir + "/" + name + ".snbr",
work_dir + "/" + name + ".snbr");
}
nbr_list_.open(work_dir + "/" + name + ".snbr", true);
}
void open_in_memory(const std::string& prefix, size_t v_cap) override {
nbr_list_.open(prefix + ".snbr", false);
if (nbr_list_.size() < v_cap) {
size_t old_size = nbr_list_.size();
nbr_list_.reset();
nbr_list_.resize(v_cap);
read_file(prefix + ".snbr", nbr_list_.data(), sizeof(nbr_t), old_size);
for (size_t k = old_size; k != v_cap; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
}
}
}
void open_with_hugepages(const std::string& prefix, size_t v_cap) override {
nbr_list_.open_with_hugepages(prefix + ".snbr", v_cap);
size_t old_size = nbr_list_.size();
if (old_size < v_cap) {
nbr_list_.resize(v_cap);
for (size_t k = old_size; k != v_cap; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
}
}
}
void dump(const std::string& name,
const std::string& new_snapshot_dir) override {
if ((!nbr_list_.filename().empty() &&
std::filesystem::exists(nbr_list_.filename()))) {
std::error_code errorCode;
std::filesystem::create_hard_link(nbr_list_.filename(),
new_snapshot_dir + "/" + name + ".snbr",
errorCode);
if (errorCode) {
std::stringstream ss;
ss << "Failed to create hard link from " << nbr_list_.filename()
<< " to " << new_snapshot_dir + "/" + name + ".snbr"
<< ", error code: " << errorCode << " " << errorCode.message();
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
} else {
write_file(new_snapshot_dir + "/" + name + ".snbr", nbr_list_.data(),
sizeof(nbr_t), nbr_list_.size());
}
}
void warmup(int thread_num) const override {
size_t vnum = nbr_list_.size();
std::vector<std::thread> threads;
std::atomic<size_t> v_i(0);
std::atomic<size_t> output(0);
const size_t chunk = 4096;
for (int i = 0; i < thread_num; ++i) {
threads.emplace_back([&]() {
size_t ret = 0;
while (true) {
size_t begin = std::min(v_i.fetch_add(chunk), vnum);
size_t end = std::min(begin + chunk, vnum);
if (begin == end) {
break;
}
while (begin < end) {
auto& nbr = nbr_list_[begin];
ret += nbr.neighbor;
++begin;
}
}
output.fetch_add(ret);
});
}
for (auto& thrd : threads) {
thrd.join();
}
(void) output.load();
}
void resize(vid_t vnum) override {
if (vnum > nbr_list_.size()) {
size_t old_size = nbr_list_.size();
nbr_list_.resize(vnum);
for (size_t k = old_size; k != vnum; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
}
} else {
nbr_list_.resize(vnum);
}
}
size_t size() const override { return nbr_list_.size(); }
size_t edge_num() const override {
size_t cnt = 0;
for (size_t k = 0; k != nbr_list_.size(); ++k) {
if (nbr_list_[k].timestamp.load() !=
std::numeric_limits<timestamp_t>::max()) {
++cnt;
}
}
return cnt;
}
std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override {
return std::make_shared<MutableCsrConstEdgeIter<EDATA_T>>(get_edges(v));
}
CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override {
return new MutableCsrConstEdgeIter<EDATA_T>(get_edges(v));
}
std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override {
return std::make_shared<MutableCsrEdgeIter<EDATA_T>>(get_edges_mut(v));
}
void put_edge(vid_t src, vid_t dst, const EDATA_T& data, timestamp_t ts,
Allocator& alloc) override {
CHECK_LT(src, nbr_list_.size());
nbr_list_[src].neighbor = dst;
nbr_list_[src].data = data;
CHECK_EQ(nbr_list_[src].timestamp, std::numeric_limits<timestamp_t>::max());
nbr_list_[src].timestamp.store(ts);
}
inline slice_t get_edges(vid_t v) const override {
slice_t ret;
ret.set_size(nbr_list_[v].timestamp.load() ==
std::numeric_limits<timestamp_t>::max()
? 0
: 1);
if (ret.size() != 0) {
ret.set_begin(&nbr_list_[v]);
}
return ret;
}
inline mut_slice_t get_edges_mut(vid_t i) {
mut_slice_t ret;
ret.set_size(nbr_list_[i].timestamp.load() ==
std::numeric_limits<timestamp_t>::max()
? 0
: 1);
if (ret.size() != 0) {
ret.set_begin(&nbr_list_[i]);
}
return ret;
}
inline const nbr_t& get_edge(vid_t i) const { return nbr_list_[i]; }
void close() override { nbr_list_.reset(); }
private:
mmap_array<nbr_t> nbr_list_;
};
template <>
class SingleMutableCsr<std::string_view>
: public TypedMutableCsrBase<std::string_view> {
public:
using nbr_t = MutableNbr<size_t>;
using slice_t = MutableNbrSlice<std::string_view>;
using mut_slice_t = MutableNbrSliceMut<std::string_view>;
SingleMutableCsr(StringColumn& column) : column_(column), csr_() {}
~SingleMutableCsr() {}
size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree,
double reserve_ratio) override {
return csr_.batch_init(name, work_dir, degree, reserve_ratio);
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
return csr_.batch_init_in_memory(degree, reserve_ratio);
}
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data,
timestamp_t ts) override {
csr_.batch_put_edge(src, dst, data, ts);
}
void batch_sort_by_edge_data(timestamp_t ts) override {}
timestamp_t unsorted_since() const override {
return std::numeric_limits<timestamp_t>::max();
}
void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {
csr_.open(name, snapshot_dir, work_dir);
}
void open_in_memory(const std::string& prefix, size_t v_cap) override {
csr_.open_in_memory(prefix, v_cap);
}
void dump(const std::string& name,
const std::string& new_snapshot_dir) override {
csr_.dump(name, new_snapshot_dir);
}
void warmup(int thread_num) const override { csr_.warmup(thread_num); }
void resize(vid_t vnum) override { csr_.resize(vnum); }
size_t size() const override { return csr_.size(); }
size_t edge_num() const override { return csr_.edge_num(); }
std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override {
return std::make_shared<MutableCsrConstEdgeIter<std::string_view>>(
get_edges(v));
}
CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override {
return new MutableCsrConstEdgeIter<std::string_view>(get_edges(v));
}
std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override {
return std::make_shared<MutableCsrEdgeIter<std::string_view>>(
get_edges_mut(v));
}
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts,
Allocator& alloc) {
csr_.put_edge(src, dst, data, ts, alloc);
}
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts,
Allocator& alloc) override {
put_edge(src, dst, index, ts, alloc);
}
inline slice_t get_edges(vid_t i) const override {
auto ret = csr_.get_edges(i);
return slice_t(ret, column_);
}
inline mut_slice_t get_edges_mut(vid_t i) {
auto ret = csr_.get_edges_mut(i);
return mut_slice_t(ret, column_);
}
inline MutableNbr<std::string_view> get_edge(vid_t i) const {
MutableNbr<std::string_view> nbr;
auto nbr_tmp = csr_.get_edge(i);
nbr.neighbor = nbr_tmp.neighbor;
nbr.timestamp.store(nbr_tmp.timestamp.load());
nbr.data = column_.get_view(nbr_tmp.data);
return nbr;
}
void close() override { csr_.close(); }
private:
StringColumn& column_;
SingleMutableCsr<size_t> csr_;
};
template <>
class SingleMutableCsr<RecordView> : public TypedMutableCsrBase<RecordView> {
public:
using nbr_t = MutableNbr<size_t>;
using slice_t = MutableNbrSlice<RecordView>;
using mut_slice_t = MutableNbrSliceMut<RecordView>;
SingleMutableCsr(Table& table) : table_(table), csr_() {}
~SingleMutableCsr() {}
size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree,
double reserve_ratio) override {
return csr_.batch_init(name, work_dir, degree, reserve_ratio);
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
return csr_.batch_init_in_memory(degree, reserve_ratio);
}
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data,
timestamp_t ts) override {
csr_.batch_put_edge(src, dst, data, ts);
}
void batch_sort_by_edge_data(timestamp_t ts) override {}
timestamp_t unsorted_since() const override {
return std::numeric_limits<timestamp_t>::max();
}
void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {
csr_.open(name, snapshot_dir, work_dir);
}
void open_in_memory(const std::string& prefix, size_t v_cap) override {
csr_.open_in_memory(prefix, v_cap);
}
void dump(const std::string& name,
const std::string& new_snapshot_dir) override {
csr_.dump(name, new_snapshot_dir);
}
void warmup(int thread_num) const override { csr_.warmup(thread_num); }
void resize(vid_t vnum) override { csr_.resize(vnum); }
size_t size() const override { return csr_.size(); }
size_t edge_num() const override { return csr_.edge_num(); }
std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override {
return std::make_shared<MutableCsrConstEdgeIter<RecordView>>(get_edges(v));
}
CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override {
return new MutableCsrConstEdgeIter<RecordView>(get_edges(v));
}
std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override {
return std::make_shared<MutableCsrEdgeIter<RecordView>>(get_edges_mut(v));
}
void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts,
Allocator& alloc) {
csr_.put_edge(src, dst, data, ts, alloc);
}
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts,
Allocator& alloc) override {
put_edge(src, dst, index, ts, alloc);
}
inline slice_t get_edges(vid_t i) const override {
auto ret = csr_.get_edges(i);
return slice_t(ret, table_);
}
inline mut_slice_t get_edges_mut(vid_t i) {
auto ret = csr_.get_edges_mut(i);
return mut_slice_t(ret, table_);
}
struct RecordNbr {
using nbr_t = MutableNbr<size_t>;
RecordNbr(const nbr_t* ptr, Table& table) : ptr_(ptr), table_(table) {}
vid_t get_neighbor() const { return ptr_->neighbor; }
timestamp_t get_timestamp() const { return ptr_->timestamp.load(); }
size_t get_index() const { return ptr_->data; }
RecordView get_data() const { return RecordView(ptr_->data, &table_); }
const nbr_t* ptr_;
Table table_;
};
inline RecordNbr get_edge(vid_t i) const {
auto nbr = csr_.get_edge(i);
return RecordNbr(&nbr, table_);
}
void close() override { csr_.close(); }
private:
Table& table_;
SingleMutableCsr<size_t> csr_;
};
template <typename EDATA_T>
class EmptyCsr : public TypedMutableCsrBase<EDATA_T> {
public:
using slice_t = MutableNbrSlice<EDATA_T>;
EmptyCsr() = default;
~EmptyCsr() = default;
size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}
void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {}
void open_in_memory(const std::string& prefix, size_t v_cap) override {}
void open_with_hugepages(const std::string& prefix, size_t v_cap) override {}
void dump(const std::string& name,
const std::string& new_snapshot_dir) override {}
void warmup(int thread_num) const override {}
void resize(vid_t vnum) override {}
size_t size() const override { return 0; }
size_t edge_num() const override { return 0; }
void batch_put_edge(vid_t src, vid_t dst, const EDATA_T& data,
timestamp_t ts = 0) override {}
void put_edge(vid_t src, vid_t dst, const EDATA_T& data, timestamp_t ts,
Allocator&) override {}
std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override {
return std::make_shared<MutableCsrConstEdgeIter<EDATA_T>>(
MutableNbrSlice<EDATA_T>::empty());
}
CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override {
return new MutableCsrConstEdgeIter<EDATA_T>(
MutableNbrSlice<EDATA_T>::empty());
}
std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override {
return std::make_shared<MutableCsrEdgeIter<EDATA_T>>(
MutableNbrSliceMut<EDATA_T>::empty());
}
void batch_sort_by_edge_data(timestamp_t ts) override {}
timestamp_t unsorted_since() const override {
return std::numeric_limits<timestamp_t>::max();
}
inline slice_t get_edges(vid_t v) const override { return slice_t::empty(); }
void close() override {}
};
template <>
class EmptyCsr<std::string_view>
: public TypedMutableCsrBase<std::string_view> {
public:
using slice_t = MutableNbrSlice<std::string_view>;
EmptyCsr(StringColumn& column) : column_(column) {}
~EmptyCsr() = default;
size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}
void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {}
void open_in_memory(const std::string& prefix, size_t v_cap) override {}
void dump(const std::string& name,
const std::string& new_snapshot_dir) override {}
void warmup(int thread_num) const override {}
void resize(vid_t vnum) override {}
size_t size() const override { return 0; }
size_t edge_num() const override { return 0; }
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts,
Allocator& alloc) override {}
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data,
timestamp_t ts = 0) override {}
std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override {
return std::make_shared<MutableCsrConstEdgeIter<std::string_view>>(
MutableNbrSlice<std::string_view>::empty(column_));
}
CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override {
return new MutableCsrConstEdgeIter<std::string_view>(
MutableNbrSlice<std::string_view>::empty(column_));
}
std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override {
return std::make_shared<MutableCsrEdgeIter<std::string_view>>(
MutableNbrSliceMut<std::string_view>::empty(column_));
}
inline slice_t get_edges(vid_t v) const override {
return slice_t::empty(column_);
}
void close() override {}
StringColumn& column_;
};
template <>
class EmptyCsr<RecordView> : public TypedMutableCsrBase<RecordView> {
public:
using slice_t = MutableNbrSlice<RecordView>;
EmptyCsr(Table& table) : table_(table) {}
~EmptyCsr() = default;
size_t batch_init(const std::string& name, const std::string& work_dir,
const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}
size_t batch_init_in_memory(const std::vector<int>& degree,
double reserve_ratio) override {
return 0;
}
void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {}
void open_in_memory(const std::string& prefix, size_t v_cap) override {}
void dump(const std::string& name,
const std::string& new_snapshot_dir) override {}
void warmup(int thread_num) const override {}
void resize(vid_t vnum) override {}
size_t size() const override { return 0; }
size_t edge_num() const override { return 0; }
void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts,
Allocator& alloc) override {}
void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data,
timestamp_t ts = 0) override {}
std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override {
return std::make_shared<MutableCsrConstEdgeIter<RecordView>>(
MutableNbrSlice<RecordView>::empty(table_));
}
CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override {
return new MutableCsrConstEdgeIter<RecordView>(
MutableNbrSlice<RecordView>::empty(table_));
}
std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override {
return std::make_shared<MutableCsrEdgeIter<RecordView>>(
MutableNbrSliceMut<RecordView>::empty(table_));
}
inline slice_t get_edges(vid_t v) const override {
return slice_t::empty(table_);
}
void close() override {}
private:
Table& table_;
};
} // namespace gs
#endif // STORAGES_RT_MUTABLE_GRAPH_CSR_MUTABLE_CSR_H_