flex/engines/graph_db/runtime/common/columns/vertex_columns.h (414 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef RUNTIME_COMMON_COLUMNS_VERTEX_COLUMNS_H_ #define RUNTIME_COMMON_COLUMNS_VERTEX_COLUMNS_H_ #include "flex/engines/graph_db/runtime/common/columns/i_context_column.h" namespace gs { namespace runtime { enum class VertexColumnType { kSingle, kMultiSegment, kMultiple, }; class IVertexColumn : public IContextColumn { public: IVertexColumn() = default; virtual ~IVertexColumn() = default; ContextColumnType column_type() const override { return ContextColumnType::kVertex; } virtual VertexColumnType vertex_column_type() const = 0; virtual VertexRecord get_vertex(size_t idx) const = 0; RTAny get_elem(size_t idx) const override { return RTAny::from_vertex(this->get_vertex(idx)); } RTAnyType elem_type() const override { return RTAnyType::kVertex; } virtual std::set<label_t> get_labels_set() const = 0; }; class IVertexColumnBuilder : public IContextColumnBuilder { public: IVertexColumnBuilder() = default; virtual ~IVertexColumnBuilder() = default; virtual void push_back_vertex(VertexRecord v) = 0; void push_back_elem(const RTAny& val) override { this->push_back_vertex(val.as_vertex()); } virtual void push_back_null() { this->push_back_vertex({std::numeric_limits<label_t>::max(), std::numeric_limits<vid_t>::max()}); } }; class SLVertexColumnBuilder; class SLVertexColumnBase : public IVertexColumn {}; class MLVertexColumnBase : public IVertexColumn {}; class SLVertexColumn : public SLVertexColumnBase { public: SLVertexColumn(label_t label) : label_(label) {} ~SLVertexColumn() = default; inline size_t size() const override { return vertices_.size(); } std::string column_info() const override { return "SLVertexColumn(" + std::to_string(label_) + ")[" + std::to_string(size()) + "]"; } inline VertexColumnType vertex_column_type() const override { return VertexColumnType::kSingle; } std::shared_ptr<IContextColumn> shuffle( const std::vector<size_t>& offsets) const override; std::shared_ptr<IContextColumn> optional_shuffle( const std::vector<size_t>& offset) const override; inline VertexRecord get_vertex(size_t idx) const override { return {label_, vertices_[idx]}; } std::shared_ptr<IContextColumn> union_col( std::shared_ptr<IContextColumn> other) const override; void generate_dedup_offset(std::vector<size_t>& offsets) const override; std::pair<std::shared_ptr<IContextColumn>, std::vector<std::vector<size_t>>> generate_aggregate_offset() const override; template <typename FUNC_T> void foreach_vertex(const FUNC_T& func) const { size_t num = vertices_.size(); for (size_t k = 0; k < num; ++k) { func(k, label_, vertices_[k]); } } std::set<label_t> get_labels_set() const override { std::set<label_t> ret; ret.insert(label_); return ret; } inline label_t label() const { return label_; } ISigColumn* generate_signature() const override; inline const std::vector<vid_t>& vertices() const { return vertices_; } private: friend class SLVertexColumnBuilder; std::vector<vid_t> vertices_; label_t label_; }; class SLVertexColumnBuilder : public IVertexColumnBuilder { public: static SLVertexColumnBuilder builder(label_t label) { return SLVertexColumnBuilder(label); } static SLVertexColumnBuilder builder(const std::set<label_t>& labels) { return SLVertexColumnBuilder(labels); } static SLVertexColumnBuilder optional_builder(label_t label) { SLVertexColumnBuilder builder(label); builder.is_optional_ = true; return builder; } static SLVertexColumnBuilder optional_builder( const std::set<label_t>& labels) { SLVertexColumnBuilder builder(labels); builder.is_optional_ = true; return builder; } ~SLVertexColumnBuilder() = default; void reserve(size_t size) override { vertices_.reserve(size); } inline void push_back_vertex(VertexRecord v) override { assert(v.label_ == label_); vertices_.push_back(v.vid_); } inline void push_back_opt(vid_t v) { vertices_.push_back(v); } inline void push_back_null() override { assert(is_optional_); vertices_.emplace_back(std::numeric_limits<vid_t>::max()); } std::shared_ptr<IContextColumn> finish( const std::shared_ptr<Arena>&) override; private: SLVertexColumnBuilder(label_t label) : label_(label), is_optional_(false) {} SLVertexColumnBuilder(const std::set<label_t>& labels) : label_(*labels.begin()), is_optional_(false) { assert(labels.size() == 1); } std::vector<vid_t> vertices_; label_t label_; bool is_optional_ = false; }; class OptionalSLVertexColumn : public SLVertexColumnBase { public: OptionalSLVertexColumn(label_t label) : label_(label) {} ~OptionalSLVertexColumn() = default; inline size_t size() const override { return vertices_.size(); } std::string column_info() const override { return "OptionalSLVertex[" + std::to_string(size()) + "]"; } inline VertexColumnType vertex_column_type() const override { return VertexColumnType::kSingle; } std::shared_ptr<IContextColumn> shuffle( const std::vector<size_t>& offsets) const override; std::shared_ptr<IContextColumn> optional_shuffle( const std::vector<size_t>& offset) const override; inline VertexRecord get_vertex(size_t idx) const override { return {label_, vertices_[idx]}; } inline bool is_optional() const override { return true; } inline bool has_value(size_t idx) const override { return vertices_[idx] != std::numeric_limits<vid_t>::max(); } void generate_dedup_offset(std::vector<size_t>& offsets) const override; template <typename FUNC_T> void foreach_vertex(const FUNC_T& func) const { size_t num = vertices_.size(); for (size_t k = 0; k < num; ++k) { func(k, label_, vertices_[k]); } } std::set<label_t> get_labels_set() const override { return {label_}; } ISigColumn* generate_signature() const override; private: friend class SLVertexColumnBuilder; label_t label_; std::vector<vid_t> vertices_; }; class MSVertexColumnBuilder; class MSVertexColumn : public IVertexColumn { public: MSVertexColumn() = default; ~MSVertexColumn() = default; size_t size() const override { size_t ret = 0; for (auto& pair : vertices_) { ret += pair.second.size(); } return ret; } std::string column_info() const override { std::string labels; for (auto label : labels_) { labels += std::to_string(label); labels += ", "; } if (!labels.empty()) { labels.resize(labels.size() - 2); } return "MSVertexColumn(" + labels + ")[" + std::to_string(size()) + "]"; } inline VertexColumnType vertex_column_type() const override { return VertexColumnType::kMultiSegment; } std::shared_ptr<IContextColumn> shuffle( const std::vector<size_t>& offsets) const override; inline VertexRecord get_vertex(size_t idx) const override { for (auto& pair : vertices_) { if (idx < pair.second.size()) { return {pair.first, pair.second[idx]}; } idx -= pair.second.size(); } LOG(FATAL) << "not found..."; return {std::numeric_limits<label_t>::max(), std::numeric_limits<vid_t>::max()}; } template <typename FUNC_T> void foreach_vertex(const FUNC_T& func) const { size_t index = 0; for (auto& pair : vertices_) { label_t label = pair.first; for (auto v : pair.second) { func(index++, label, v); } } } std::set<label_t> get_labels_set() const override { return labels_; } ISigColumn* generate_signature() const override; inline size_t seg_num() const { return vertices_.size(); } inline label_t seg_label(size_t seg_id) const { return vertices_[seg_id].first; } const std::vector<vid_t>& seg_vertices(size_t seg_id) const { return vertices_[seg_id].second; } private: friend class MSVertexColumnBuilder; std::vector<std::pair<label_t, std::vector<vid_t>>> vertices_; std::set<label_t> labels_; }; class MSVertexColumnBuilder : public IVertexColumnBuilder { public: static MSVertexColumnBuilder builder() { return MSVertexColumnBuilder(); } ~MSVertexColumnBuilder() = default; void reserve(size_t size) override {} inline void push_back_vertex(VertexRecord v) override { if (v.label_ == cur_label_) { cur_list_.push_back(v.vid_); } else { if (!cur_list_.empty()) { vertices_.emplace_back(cur_label_, std::move(cur_list_)); cur_list_.clear(); } cur_label_ = v.label_; cur_list_.push_back(v.vid_); } } void start_label(label_t label) { if (!cur_list_.empty() && cur_label_ != label) { vertices_.emplace_back(cur_label_, std::move(cur_list_)); cur_list_.clear(); } cur_label_ = label; } inline void push_back_opt(vid_t v) { cur_list_.push_back(v); } inline void push_back_null() override { LOG(FATAL) << "MSVertexColumnBuilder does not support null value."; } std::shared_ptr<IContextColumn> finish( const std::shared_ptr<Arena>&) override; private: MSVertexColumnBuilder() = default; label_t cur_label_; std::vector<vid_t> cur_list_; std::vector<std::pair<label_t, std::vector<vid_t>>> vertices_; }; class MLVertexColumnBuilder; class MLVertexColumn : public MLVertexColumnBase { public: MLVertexColumn() = default; ~MLVertexColumn() = default; inline size_t size() const override { return vertices_.size(); } std::string column_info() const override { std::string labels; for (auto label : labels_) { labels += std::to_string(label); labels += ", "; } if (!labels.empty()) { labels.resize(labels.size() - 2); } return "MLVertexColumn(" + labels + ")[" + std::to_string(size()) + "]"; } inline VertexColumnType vertex_column_type() const override { return VertexColumnType::kMultiple; } std::shared_ptr<IContextColumn> shuffle( const std::vector<size_t>& offsets) const override; std::shared_ptr<IContextColumn> optional_shuffle( const std::vector<size_t>& offsets) const override; inline VertexRecord get_vertex(size_t idx) const override { return vertices_[idx]; } template <typename FUNC_T> void foreach_vertex(const FUNC_T& func) const { size_t index = 0; for (auto& pair : vertices_) { func(index++, pair.label_, pair.vid_); } } std::set<label_t> get_labels_set() const override { return labels_; } ISigColumn* generate_signature() const override; void generate_dedup_offset(std::vector<size_t>& offsets) const override; private: friend class MLVertexColumnBuilder; std::vector<VertexRecord> vertices_; std::set<label_t> labels_; }; class MLVertexColumnBuilder : public IVertexColumnBuilder { public: static MLVertexColumnBuilder builder() { return MLVertexColumnBuilder(); } static MLVertexColumnBuilder optional_builder() { MLVertexColumnBuilder builder; builder.is_optional_ = true; return builder; } static MLVertexColumnBuilder builder(const std::set<label_t>& labels) { return MLVertexColumnBuilder(labels); } static MLVertexColumnBuilder optional_builder( const std::set<label_t>& labels) { MLVertexColumnBuilder builder(labels); builder.is_optional_ = true; return builder; } ~MLVertexColumnBuilder() = default; void reserve(size_t size) override { vertices_.reserve(size); } inline void push_back_opt(VertexRecord v) { labels_.insert(v.label_); vertices_.push_back(v); } inline void push_back_vertex(VertexRecord v) override { labels_.insert(v.label_); vertices_.push_back(v); } std::shared_ptr<IContextColumn> finish( const std::shared_ptr<Arena>&) override; private: MLVertexColumnBuilder() : is_optional_(false) {} MLVertexColumnBuilder(const std::set<label_t>& labels) : labels_(labels), is_optional_(false) {} std::vector<VertexRecord> vertices_; std::set<label_t> labels_; bool is_optional_; }; class OptionalMLVertexColumn : public MLVertexColumnBase { public: OptionalMLVertexColumn() = default; ~OptionalMLVertexColumn() = default; inline size_t size() const override { return vertices_.size(); } std::string column_info() const override { std::string labels; for (auto label : labels_) { labels += std::to_string(label); labels += ", "; } if (!labels.empty()) { labels.resize(labels.size() - 2); } return "OptionalMLVertexColumn(" + labels + ")[" + std::to_string(size()) + "]"; } inline VertexColumnType vertex_column_type() const override { return VertexColumnType::kMultiple; } std::shared_ptr<IContextColumn> shuffle( const std::vector<size_t>& offsets) const override; std::shared_ptr<IContextColumn> optional_shuffle( const std::vector<size_t>& offsets) const override; inline VertexRecord get_vertex(size_t idx) const override { return vertices_[idx]; } inline bool is_optional() const override { return true; } inline bool has_value(size_t idx) const override { return vertices_[idx].vid_ != std::numeric_limits<vid_t>::max(); } template <typename FUNC_T> void foreach_vertex(const FUNC_T& func) const { size_t index = 0; for (auto& pair : vertices_) { func(index++, pair.label_, pair.vid_); } } std::set<label_t> get_labels_set() const override { return labels_; } private: friend class MLVertexColumnBuilder; std::vector<VertexRecord> vertices_; std::set<label_t> labels_; }; template <typename FUNC_T> void foreach_vertex(const IVertexColumn& col, const FUNC_T& func) { if (col.vertex_column_type() == VertexColumnType::kSingle) { if (!col.is_optional()) { const SLVertexColumn& ref = dynamic_cast<const SLVertexColumn&>(col); ref.foreach_vertex(func); } else { const OptionalSLVertexColumn& ref = dynamic_cast<const OptionalSLVertexColumn&>(col); ref.foreach_vertex(func); } } else if (col.vertex_column_type() == VertexColumnType::kMultiple) { if (!col.is_optional()) { const MLVertexColumn& ref = dynamic_cast<const MLVertexColumn&>(col); ref.foreach_vertex(func); } else { const OptionalMLVertexColumn& ref = dynamic_cast<const OptionalMLVertexColumn&>(col); ref.foreach_vertex(func); } } else { const MSVertexColumn& ref = dynamic_cast<const MSVertexColumn&>(col); ref.foreach_vertex(func); } } } // namespace runtime } // namespace gs #endif // COMMON_COLUMNS_VERTEX_COLUMNS_H_