flex/storages/rt_mutable_graph/csr/immutable_csr.h (701 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef STORAGES_RT_MUTABLE_GRAPH_CSR_IMMUTABLE_CSR_H_ #define STORAGES_RT_MUTABLE_GRAPH_CSR_IMMUTABLE_CSR_H_ namespace gs { template <typename EDATA_T> class ImmutableCsrConstEdgeIter : public CsrConstEdgeIterBase { using const_nbr_ptr_t = typename ImmutableNbrSlice<EDATA_T>::const_nbr_ptr_t; public: explicit ImmutableCsrConstEdgeIter(const ImmutableNbrSlice<EDATA_T>& slice) : cur_(slice.begin()), end_(slice.end()) {} ~ImmutableCsrConstEdgeIter() = default; vid_t get_neighbor() const override { return (*cur_).get_neighbor(); } Any get_data() const override { return AnyConverter<EDATA_T>::to_any((*cur_).get_data()); } timestamp_t get_timestamp() const override { return 0; } void next() override { ++cur_; } CsrConstEdgeIterBase& operator+=(size_t offset) override { cur_ += offset; if (!(cur_ < end_)) { cur_ = end_; } return *this; } bool is_valid() const override { return cur_ != end_; } size_t size() const override { return end_ - cur_; } private: const_nbr_ptr_t cur_; const_nbr_ptr_t end_; }; template <typename EDATA_T> class ImmutableCsr : public TypedImmutableCsrBase<EDATA_T> { public: using nbr_t = ImmutableNbr<EDATA_T>; using slice_t = ImmutableNbrSlice<EDATA_T>; size_t batch_init(const std::string& name, const std::string& work_dir, const std::vector<int>& degree, double reserve_ratio) override { size_t vnum = degree.size(); adj_lists_.open(work_dir + "/" + name + ".adj", true); adj_lists_.resize(vnum); size_t edge_num = 0; for (auto d : degree) { edge_num += d; } nbr_list_.open(work_dir + "/" + name + ".nbr", true); nbr_list_.resize(edge_num); degree_list_.open(work_dir + "/" + name + ".deg", true); degree_list_.resize(vnum); nbr_t* ptr = nbr_list_.data(); for (vid_t i = 0; i < vnum; ++i) { int deg = degree[i]; if (deg != 0) { adj_lists_[i] = ptr; } else { adj_lists_[i] = NULL; } ptr += deg; degree_list_[i] = 0; } unsorted_since_ = 0; return edge_num; } size_t batch_init_in_memory(const std::vector<int>& degree, double reserve_ratio) override { size_t vnum = degree.size(); adj_lists_.open("", false); adj_lists_.resize(vnum); size_t edge_num = 0; for (auto d : degree) { edge_num += d; } nbr_list_.open("", false); nbr_list_.resize(edge_num); degree_list_.open("", false); degree_list_.resize(vnum); nbr_t* ptr = nbr_list_.data(); for (vid_t i = 0; i < vnum; ++i) { int deg = degree[i]; if (deg != 0) { adj_lists_[i] = ptr; } else { adj_lists_[i] = NULL; } ptr += deg; degree_list_[i] = 0; } unsorted_since_ = 0; return edge_num; } void batch_put_edge(vid_t src, vid_t dst, const EDATA_T& data, timestamp_t ts) override { auto& nbr = adj_lists_[src][degree_list_[src]++]; nbr.neighbor = dst; nbr.data = data; } void batch_sort_by_edge_data(timestamp_t ts) override { size_t vnum = adj_lists_.size(); for (size_t i = 0; i != vnum; ++i) { std::sort(adj_lists_[i], adj_lists_[i] + degree_list_[i], [](const nbr_t& lhs, const nbr_t& rhs) { return lhs.data < rhs.data; }); } unsorted_since_ = ts; } timestamp_t unsorted_since() const override { return unsorted_since_; } void open(const std::string& name, const std::string& snapshot_dir, const std::string& work_dir) override { // Changes made to the CSR will not be synchronized to the file // TODO(luoxiaojian): Implement the insert operation on ImmutableCsr. if (snapshot_dir != "") { degree_list_.open(snapshot_dir + "/" + name + ".deg", false); nbr_list_.open(snapshot_dir + "/" + name + ".nbr", false); load_meta(snapshot_dir + "/" + name); } adj_lists_.open(work_dir + "/" + name + ".adj", true); adj_lists_.resize(degree_list_.size()); nbr_t* ptr = nbr_list_.data(); for (size_t i = 0; i < degree_list_.size(); ++i) { int deg = degree_list_[i]; adj_lists_[i] = ptr; ptr += deg; } } void open_in_memory(const std::string& prefix, size_t v_cap) override { degree_list_.open(prefix + ".deg", false); load_meta(prefix); nbr_list_.open(prefix + ".nbr", false); adj_lists_.reset(); v_cap = std::max(v_cap, degree_list_.size()); adj_lists_.resize(v_cap); size_t old_degree_size = degree_list_.size(); degree_list_.resize(v_cap); nbr_t* ptr = nbr_list_.data(); for (size_t i = 0; i < old_degree_size; ++i) { int deg = degree_list_[i]; if (deg != 0) { adj_lists_[i] = ptr; } else { adj_lists_[i] = NULL; } ptr += deg; } for (size_t i = old_degree_size; i < degree_list_.size(); ++i) { degree_list_[i] = 0; adj_lists_[i] = NULL; } } void open_with_hugepages(const std::string& prefix, size_t v_cap) override { degree_list_.open_with_hugepages(prefix + ".deg", v_cap); load_meta(prefix); nbr_list_.open_with_hugepages(prefix + ".nbr"); adj_lists_.reset(); v_cap = std::max(v_cap, degree_list_.size()); adj_lists_.resize(v_cap); size_t old_degree_size = degree_list_.size(); degree_list_.resize(v_cap); nbr_t* ptr = nbr_list_.data(); for (size_t i = 0; i < old_degree_size; ++i) { int deg = degree_list_[i]; if (deg != 0) { adj_lists_[i] = ptr; } else { adj_lists_[i] = NULL; } ptr += deg; } for (size_t i = old_degree_size; i < degree_list_.size(); ++i) { degree_list_[i] = 0; adj_lists_[i] = NULL; } } void dump(const std::string& name, const std::string& new_snapshot_dir) override { dump_meta(new_snapshot_dir + "/" + name); size_t vnum = adj_lists_.size(); { FILE* fout = fopen((new_snapshot_dir + "/" + name + ".deg").c_str(), "wb"); fwrite(degree_list_.data(), sizeof(int), vnum, fout); fflush(fout); fclose(fout); } { FILE* fout = fopen((new_snapshot_dir + "/" + name + ".nbr").c_str(), "wb"); for (size_t k = 0; k < vnum; ++k) { if (adj_lists_[k] != NULL && degree_list_[k] != 0) { fwrite(adj_lists_[k], sizeof(nbr_t), degree_list_[k], fout); } } fflush(fout); fclose(fout); } } void warmup(int thread_num) const override {} void resize(vid_t vnum) override { if (vnum > adj_lists_.size()) { size_t old_size = adj_lists_.size(); adj_lists_.resize(vnum); degree_list_.resize(vnum); for (size_t k = old_size; k != vnum; ++k) { adj_lists_[k] = NULL; degree_list_[k] = 0; } } else { adj_lists_.resize(vnum); degree_list_.resize(vnum); } } size_t size() const override { return adj_lists_.size(); } size_t edge_num() const override { size_t ret = 0; for (size_t i = 0; i < adj_lists_.size(); ++i) { ret += degree_list_[i]; } return ret; } std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override { return std::make_shared<ImmutableCsrConstEdgeIter<EDATA_T>>(get_edges(v)); } CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override { return new ImmutableCsrConstEdgeIter<EDATA_T>(get_edges(v)); } std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override { return nullptr; } void put_edge(vid_t src, vid_t dst, const EDATA_T& data, timestamp_t ts, Allocator& alloc) override { LOG(FATAL) << "Put single edge is not supported"; } slice_t get_edges(vid_t v) const override { slice_t ret; ret.set_begin(adj_lists_[v]); ret.set_size(degree_list_[v]); return ret; } void close() override { adj_lists_.reset(); degree_list_.reset(); nbr_list_.reset(); } private: void load_meta(const std::string& prefix) { std::string meta_file_path = prefix + ".meta"; if (std::filesystem::exists(meta_file_path)) { FILE* meta_file_fd = fopen(meta_file_path.c_str(), "r"); CHECK_EQ(fread(&unsorted_since_, sizeof(timestamp_t), 1, meta_file_fd), 1); fclose(meta_file_fd); } else { unsorted_since_ = 0; } } void dump_meta(const std::string& prefix) const { std::string meta_file_path = prefix + ".meta"; FILE* meta_file_fd = fopen((prefix + ".meta").c_str(), "wb"); CHECK_EQ(fwrite(&unsorted_since_, sizeof(timestamp_t), 1, meta_file_fd), 1); fflush(meta_file_fd); fclose(meta_file_fd); } mmap_array<nbr_t*> adj_lists_; mmap_array<int> degree_list_; mmap_array<nbr_t> nbr_list_; timestamp_t unsorted_since_; }; template <> class ImmutableCsr<std::string_view> : public TypedImmutableCsrBase<std::string_view> { public: using nbr_t = ImmutableNbr<size_t>; using slice_t = ImmutableNbrSlice<std::string_view>; ImmutableCsr(StringColumn& column) : column_(column), csr_() {} ~ImmutableCsr() {} size_t batch_init(const std::string& name, const std::string& work_dir, const std::vector<int>& degree, double reserve_ratio = 1.2) override { return csr_.batch_init(name, work_dir, degree, reserve_ratio); } size_t batch_init_in_memory(const std::vector<int>& degree, double reserve_ratio = 1.2) override { return csr_.batch_init_in_memory(degree, reserve_ratio); } void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts = 0) override { csr_.batch_put_edge(src, dst, data, ts); } void open(const std::string& name, const std::string& snapshot_dir, const std::string& work_dir) override { csr_.open(name, snapshot_dir, work_dir); } void open_in_memory(const std::string& prefix, size_t v_cap) override { csr_.open_in_memory(prefix, v_cap); } void open_with_hugepages(const std::string& prefix, size_t v_cap) override { csr_.open_with_hugepages(prefix, v_cap); } void dump(const std::string& name, const std::string& new_snapshot_dir) override { csr_.dump(name, new_snapshot_dir); } void warmup(int thread_num) const override { csr_.warmup(thread_num); } void resize(vid_t vnum) override { csr_.resize(vnum); } size_t size() const override { return csr_.size(); } std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override { return std::make_shared<ImmutableCsrConstEdgeIter<std::string_view>>( get_edges(v)); } CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override { return new ImmutableCsrConstEdgeIter<std::string_view>(get_edges(v)); } std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override { return nullptr; } void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator& alloc) { csr_.put_edge(src, dst, data, ts, alloc); } void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator& alloc) override { csr_.put_edge(src, dst, index, ts, alloc); } slice_t get_edges(vid_t i) const override { return slice_t(csr_.get_edges(i), column_); } void close() override { csr_.close(); } size_t edge_num() const override { return csr_.edge_num(); } private: StringColumn& column_; ImmutableCsr<size_t> csr_; }; template <> class ImmutableCsr<RecordView> : public TypedImmutableCsrBase<RecordView> { public: using nbr_t = ImmutableNbr<size_t>; using slice_t = ImmutableNbrSlice<RecordView>; ImmutableCsr(Table& table) : table_(table), csr_() {} ~ImmutableCsr() {} size_t batch_init(const std::string& name, const std::string& work_dir, const std::vector<int>& degree, double reserve_ratio = 1.2) override { return csr_.batch_init(name, work_dir, degree, reserve_ratio); } size_t batch_init_in_memory(const std::vector<int>& degree, double reserve_ratio = 1.2) override { return csr_.batch_init_in_memory(degree, reserve_ratio); } void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts = 0) override { csr_.batch_put_edge(src, dst, data, ts); } void open(const std::string& name, const std::string& snapshot_dir, const std::string& work_dir) override { csr_.open(name, snapshot_dir, work_dir); } void open_in_memory(const std::string& prefix, size_t v_cap) override { csr_.open_in_memory(prefix, v_cap); } void open_with_hugepages(const std::string& prefix, size_t v_cap) override { csr_.open_with_hugepages(prefix, v_cap); } void dump(const std::string& name, const std::string& new_snapshot_dir) override { csr_.dump(name, new_snapshot_dir); } void warmup(int thread_num) const override { csr_.warmup(thread_num); } void resize(vid_t vnum) override { csr_.resize(vnum); } size_t size() const override { return csr_.size(); } std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override { return std::make_shared<ImmutableCsrConstEdgeIter<RecordView>>( get_edges(v)); } CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override { return new ImmutableCsrConstEdgeIter<RecordView>(get_edges(v)); } std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override { return nullptr; } void put_edge(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator& alloc) { csr_.put_edge(src, dst, data, ts, alloc); } void put_edge_with_index(vid_t src, vid_t dst, size_t index, timestamp_t ts, Allocator& alloc) override { csr_.put_edge(src, dst, index, ts, alloc); } slice_t get_edges(vid_t i) const override { return slice_t(csr_.get_edges(i), table_); } void close() override { csr_.close(); } size_t edge_num() const override { return csr_.edge_num(); } private: Table& table_; ImmutableCsr<size_t> csr_; }; template <typename EDATA_T> class SingleImmutableCsr : public TypedImmutableCsrBase<EDATA_T> { public: using nbr_t = ImmutableNbr<EDATA_T>; using slice_t = ImmutableNbrSlice<EDATA_T>; SingleImmutableCsr() {} ~SingleImmutableCsr() {} size_t batch_init(const std::string& name, const std::string& work_dir, const std::vector<int>& degree, double reserve_ratio) override { size_t vnum = degree.size(); nbr_list_.open(work_dir + "/" + name + ".snbr", true); nbr_list_.resize(vnum); for (size_t k = 0; k != vnum; ++k) { nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max(); } return vnum; } size_t batch_init_in_memory(const std::vector<int>& degree, double reserve_ratio) override { size_t vnum = degree.size(); nbr_list_.open("", false); nbr_list_.resize(vnum); for (size_t k = 0; k != vnum; ++k) { nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max(); } return vnum; } void batch_put_edge(vid_t src, vid_t dst, const EDATA_T& data, timestamp_t ts) override { CHECK_EQ(nbr_list_[src].neighbor, std::numeric_limits<vid_t>::max()); nbr_list_[src].neighbor = dst; nbr_list_[src].data = data; } void batch_sort_by_edge_data(timestamp_t ts) override {} timestamp_t unsorted_since() const override { return std::numeric_limits<timestamp_t>::max(); } void open(const std::string& name, const std::string& snapshot_dir, const std::string& work_dir) override { if (!std::filesystem::exists(work_dir + "/" + name + ".snbr")) { copy_file(snapshot_dir + "/" + name + ".snbr", work_dir + "/" + name + ".snbr"); } nbr_list_.open(work_dir + "/" + name + ".snbr", true); } void open_in_memory(const std::string& prefix, size_t v_cap) override { nbr_list_.open(prefix + ".snbr", false); if (nbr_list_.size() < v_cap) { size_t old_size = nbr_list_.size(); nbr_list_.reset(); nbr_list_.resize(v_cap); FILE* fin = fopen((prefix + ".snbr").c_str(), "r"); CHECK_EQ(fread(nbr_list_.data(), sizeof(nbr_t), old_size, fin), old_size); fclose(fin); for (size_t k = old_size; k != v_cap; ++k) { nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max(); } } } void open_with_hugepages(const std::string& prefix, size_t v_cap) override { nbr_list_.open_with_hugepages(prefix + ".snbr", v_cap); size_t old_size = nbr_list_.size(); if (old_size < v_cap) { nbr_list_.resize(v_cap); for (size_t k = old_size; k != v_cap; ++k) { nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max(); } } } void dump(const std::string& name, const std::string& new_snapshot_dir) override { if (!nbr_list_.filename().empty() && std::filesystem::exists(nbr_list_.filename())) { std::filesystem::create_hard_link( nbr_list_.filename(), new_snapshot_dir + "/" + name + ".snbr"); } else { FILE* fp = fopen((new_snapshot_dir + "/" + name + ".snbr").c_str(), "wb"); fwrite(nbr_list_.data(), sizeof(nbr_t), nbr_list_.size(), fp); fflush(fp); fclose(fp); } } void warmup(int thread_num) const override { size_t vnum = nbr_list_.size(); std::vector<std::thread> threads; std::atomic<size_t> v_i(0); std::atomic<size_t> output(0); const size_t chunk = 4096; for (int i = 0; i < thread_num; ++i) { threads.emplace_back([&]() { size_t ret = 0; while (true) { size_t begin = std::min(v_i.fetch_add(chunk), vnum); size_t end = std::min(begin + chunk, vnum); if (begin == end) { break; } while (begin < end) { auto& nbr = nbr_list_[begin]; ret += nbr.neighbor; ++begin; } } output.fetch_add(ret); }); } for (auto& thrd : threads) { thrd.join(); } (void) output.load(); } void resize(vid_t vnum) override { if (vnum > nbr_list_.size()) { size_t old_size = nbr_list_.size(); nbr_list_.resize(vnum); for (size_t k = old_size; k != vnum; ++k) { nbr_list_[k].neighbor = std::numeric_limits<vid_t>::max(); } } else { nbr_list_.resize(vnum); } } size_t size() const override { return nbr_list_.size(); } size_t edge_num() const override { size_t ret = 0; for (size_t i = 0; i < nbr_list_.size(); ++i) { if (nbr_list_[i].neighbor != std::numeric_limits<vid_t>::max()) { ++ret; } } return ret; } std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override { return std::make_shared<ImmutableCsrConstEdgeIter<EDATA_T>>(get_edges(v)); } CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override { return new ImmutableCsrConstEdgeIter<EDATA_T>(get_edges(v)); } std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override { return nullptr; } void put_edge(vid_t src, vid_t dst, const EDATA_T& data, timestamp_t ts, Allocator&) override { CHECK_LT(src, nbr_list_.size()); CHECK_EQ(nbr_list_[src].neighbor, std::numeric_limits<vid_t>::max()); nbr_list_[src].neighbor = dst; nbr_list_[src].data = data; } slice_t get_edges(vid_t i) const override { slice_t ret; ret.set_size( nbr_list_[i].neighbor == std::numeric_limits<vid_t>::max() ? 0 : 1); if (ret.size() != 0) { ret.set_begin(&nbr_list_[i]); } return ret; } void close() override { nbr_list_.reset(); } const nbr_t& get_edge(vid_t i) const { return nbr_list_[i]; } private: mmap_array<nbr_t> nbr_list_; }; template <> class SingleImmutableCsr<std::string_view> : public TypedImmutableCsrBase<std::string_view> { public: using nbr_t = ImmutableNbr<size_t>; using slice_t = ImmutableNbrSlice<std::string_view>; SingleImmutableCsr(StringColumn& column) : column_(column), csr_() {} ~SingleImmutableCsr() {} size_t batch_init(const std::string& name, const std::string& work_dir, const std::vector<int>& degree, double reserve_ratio) override { return csr_.batch_init(name, work_dir, degree, reserve_ratio); } size_t batch_init_in_memory(const std::vector<int>& degree, double reserve_ratio) override { return csr_.batch_init_in_memory(degree, reserve_ratio); } void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts) override { csr_.batch_put_edge(src, dst, data, ts); } void batch_sort_by_edge_data(timestamp_t ts) override {} timestamp_t unsorted_since() const override { return std::numeric_limits<timestamp_t>::max(); } void open(const std::string& name, const std::string& snapshot_dir, const std::string& work_dir) override { csr_.open(name, snapshot_dir, work_dir); } void open_in_memory(const std::string& prefix, size_t v_cap) override { csr_.open_in_memory(prefix, v_cap); } void open_with_hugepages(const std::string& prefix, size_t v_cap) override { csr_.open_with_hugepages(prefix, v_cap); } void dump(const std::string& name, const std::string& new_snapshot_dir) override { csr_.dump(name, new_snapshot_dir); } void warmup(int thread_num) const override { csr_.warmup(thread_num); } void resize(vid_t vnum) override { csr_.resize(vnum); } size_t size() const override { return csr_.size(); } size_t edge_num() const override { return csr_.edge_num(); } std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override { return std::make_shared<ImmutableCsrConstEdgeIter<std::string_view>>( get_edges(v)); } CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override { return new ImmutableCsrConstEdgeIter<std::string_view>(get_edges(v)); } std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override { return nullptr; } void put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator& alloc) override { csr_.put_edge(src, dst, data, ts, alloc); } slice_t get_edges(vid_t i) const override { return slice_t(csr_.get_edges(i), column_); } ImmutableNbr<std::string_view> get_edge(vid_t i) const { ImmutableNbr<std::string_view> nbr; nbr.neighbor = csr_.get_edge(i).neighbor; size_t index = csr_.get_edge(i).data; nbr.data = column_.get_view(index); return nbr; } void close() override { csr_.close(); } private: StringColumn& column_; SingleImmutableCsr<size_t> csr_; }; template <> class SingleImmutableCsr<RecordView> : public TypedImmutableCsrBase<RecordView> { public: using nbr_t = ImmutableNbr<size_t>; using slice_t = ImmutableNbrSlice<RecordView>; SingleImmutableCsr(Table& table) : table_(table), csr_() {} ~SingleImmutableCsr() {} size_t batch_init(const std::string& name, const std::string& work_dir, const std::vector<int>& degree, double reserve_ratio) override { return csr_.batch_init(name, work_dir, degree, reserve_ratio); } size_t batch_init_in_memory(const std::vector<int>& degree, double reserve_ratio) override { return csr_.batch_init_in_memory(degree, reserve_ratio); } void batch_put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts) override { csr_.batch_put_edge(src, dst, data, ts); } void batch_sort_by_edge_data(timestamp_t ts) override {} timestamp_t unsorted_since() const override { return std::numeric_limits<timestamp_t>::max(); } void open(const std::string& name, const std::string& snapshot_dir, const std::string& work_dir) override { csr_.open(name, snapshot_dir, work_dir); } void open_in_memory(const std::string& prefix, size_t v_cap) override { csr_.open_in_memory(prefix, v_cap); } void open_with_hugepages(const std::string& prefix, size_t v_cap) override { csr_.open_with_hugepages(prefix, v_cap); } void dump(const std::string& name, const std::string& new_snapshot_dir) override { csr_.dump(name, new_snapshot_dir); } void warmup(int thread_num) const override { csr_.warmup(thread_num); } void resize(vid_t vnum) override { csr_.resize(vnum); } size_t size() const override { return csr_.size(); } size_t edge_num() const override { return csr_.edge_num(); } std::shared_ptr<CsrConstEdgeIterBase> edge_iter(vid_t v) const override { return std::make_shared<ImmutableCsrConstEdgeIter<RecordView>>( get_edges(v)); } CsrConstEdgeIterBase* edge_iter_raw(vid_t v) const override { return new ImmutableCsrConstEdgeIter<RecordView>(get_edges(v)); } std::shared_ptr<CsrEdgeIterBase> edge_iter_mut(vid_t v) override { return nullptr; } void put_edge_with_index(vid_t src, vid_t dst, size_t data, timestamp_t ts, Allocator& alloc) override { csr_.put_edge(src, dst, data, ts, alloc); } slice_t get_edges(vid_t i) const override { return slice_t(csr_.get_edges(i), table_); } ImmutableNbr<RecordView> get_edge(vid_t i) const { ImmutableNbr<RecordView> nbr; nbr.neighbor = csr_.get_edge(i).neighbor; nbr.data = RecordView(csr_.get_edge(i).data, &table_); return nbr; } void close() override { csr_.close(); } private: Table& table_; SingleImmutableCsr<size_t> csr_; }; } // namespace gs #endif // STORAGES_RT_MUTABLE_GRAPH_CSR_IMMUTABLE_CSR_H_