flex/engines/graph_db/runtime/execute/ops/retrieve/project.cc (1,423 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "flex/engines/graph_db/runtime/execute/ops/retrieve/project.h" #include "flex/engines/graph_db/runtime/common/operators/retrieve/order_by.h" #include "flex/engines/graph_db/runtime/common/operators/retrieve/project.h" #include "flex/engines/graph_db/runtime/execute/ops/retrieve/order_by_utils.h" #include "flex/engines/graph_db/runtime/utils/expr.h" #include "flex/engines/graph_db/runtime/utils/special_predicates.h" namespace gs { namespace runtime { namespace ops { template <typename T> struct ValueCollector { struct ExprWrapper { using V = T; ExprWrapper(Expr&& expr) : arena(std::make_shared<Arena>()), expr(std::move(expr)) {} inline T operator()(size_t idx) const { auto val = expr.eval_path(idx, *arena); return TypedConverter<T>::to_typed(val); } mutable std::shared_ptr<Arena> arena; Expr expr; }; using EXPR = ExprWrapper; ValueCollector(const Context& ctx, const EXPR& expr) : arena_(expr.arena) { builder.reserve(ctx.row_num()); } void collect(const EXPR& expr, size_t idx) { auto val = expr(idx); builder.push_back_opt(val); } auto get() { if constexpr (gs::runtime::is_view_type<T>::value) { return builder.finish(arena_); } else { return builder.finish(nullptr); } } std::shared_ptr<Arena> arena_; ValueColumnBuilder<T> builder; }; template <typename VertexColoumn, typename T> struct SLPropertyExpr { using V = T; SLPropertyExpr(const GraphReadInterface& graph, const VertexColoumn& column, const std::string& property_name) : column(column) { auto labels = column.get_labels_set(); auto& label = *labels.begin(); property = graph.GetVertexColumn<T>(label, property_name); is_optional_ = property.is_null(); } inline T operator()(size_t idx) const { auto v = column.get_vertex(idx); return property.get_view(v.vid_); } bool is_optional() const { return is_optional_; } bool is_optional_; const VertexColoumn& column; GraphReadInterface::vertex_column_t<T> property; }; template <typename VertexColoumn, typename T> struct MLPropertyExpr { using V = T; MLPropertyExpr(const GraphReadInterface& graph, const VertexColoumn& vertex, const std::string& property_name) : vertex(vertex) { auto labels = vertex.get_labels_set(); int label_num = graph.schema().vertex_label_num(); property.resize(label_num); is_optional_ = false; for (auto label : labels) { property[label] = graph.GetVertexColumn<T>(label, property_name); if (property[label].is_null()) { is_optional_ = true; } } } bool is_optional() const { return is_optional_; } inline T operator()(size_t idx) const { auto v = vertex.get_vertex(idx); return property[v.label_].get_view(v.vid_); } const VertexColoumn& vertex; std::vector<GraphReadInterface::vertex_column_t<T>> property; bool is_optional_; }; template <typename EXPR> struct PropertyValueCollector { PropertyValueCollector(const Context& ctx) { builder.reserve(ctx.row_num()); } void collect(const EXPR& expr, size_t idx) { builder.push_back_opt(expr(idx)); } auto get() { return builder.finish(nullptr); } ValueColumnBuilder<typename EXPR::V> builder; }; template <typename VertexColumn> std::unique_ptr<ProjectExprBase> create_sl_property_expr( const Context& ctx, const GraphReadInterface& graph, const VertexColumn& column, const std::string& property_name, RTAnyType type, int alias) { switch (type) { case RTAnyType::kI32Value: { auto expr = SLPropertyExpr<VertexColumn, int32_t>(graph, column, property_name); if (expr.is_optional()) { return nullptr; } PropertyValueCollector<decltype(expr)> collector(ctx); return std::make_unique<ProjectExpr<SLPropertyExpr<VertexColumn, int32_t>, decltype(collector)>>(std::move(expr), collector, alias); } case RTAnyType::kI64Value: { auto expr = SLPropertyExpr<VertexColumn, int64_t>(graph, column, property_name); if (expr.is_optional()) { return nullptr; } PropertyValueCollector<decltype(expr)> collector(ctx); return std::make_unique<ProjectExpr<SLPropertyExpr<VertexColumn, int64_t>, decltype(collector)>>(std::move(expr), collector, alias); } case RTAnyType::kF64Value: { auto expr = SLPropertyExpr<VertexColumn, double>(graph, column, property_name); if (expr.is_optional()) { return nullptr; } PropertyValueCollector<decltype(expr)> collector(ctx); return std::make_unique< ProjectExpr<SLPropertyExpr<VertexColumn, double>, decltype(collector)>>( std::move(expr), collector, alias); } case RTAnyType::kStringValue: { auto expr = SLPropertyExpr<VertexColumn, std::string_view>(graph, column, property_name); PropertyValueCollector<decltype(expr)> collector(ctx); if (expr.is_optional()) { return nullptr; } return std::make_unique<ProjectExpr< SLPropertyExpr<VertexColumn, std::string_view>, decltype(collector)>>( std::move(expr), collector, alias); } case RTAnyType::kDate32: { auto expr = SLPropertyExpr<VertexColumn, Day>(graph, column, property_name); if (expr.is_optional()) { return nullptr; } PropertyValueCollector<decltype(expr)> collector(ctx); return std::make_unique< ProjectExpr<SLPropertyExpr<VertexColumn, Day>, decltype(collector)>>( std::move(expr), collector, alias); } case RTAnyType::kTimestamp: { auto expr = SLPropertyExpr<VertexColumn, Date>(graph, column, property_name); PropertyValueCollector<decltype(expr)> collector(ctx); if (expr.is_optional()) { return nullptr; } return std::make_unique< ProjectExpr<SLPropertyExpr<VertexColumn, Date>, decltype(collector)>>( std::move(expr), collector, alias); } default: LOG(INFO) << "not implemented - " << static_cast<int>(type); } return nullptr; } template <typename VertexColumn> std::unique_ptr<ProjectExprBase> create_ml_property_expr( const Context& ctx, const GraphReadInterface& graph, const VertexColumn& column, const std::string& property_name, RTAnyType type, int alias) { switch (type) { case RTAnyType::kI32Value: { auto expr = MLPropertyExpr<VertexColumn, int32_t>(graph, column, property_name); if (expr.is_optional()) { return nullptr; } PropertyValueCollector<decltype(expr)> collector(ctx); return std::make_unique<ProjectExpr<MLPropertyExpr<VertexColumn, int32_t>, decltype(collector)>>(std::move(expr), collector, alias); } case RTAnyType::kI64Value: { auto expr = MLPropertyExpr<VertexColumn, int64_t>(graph, column, property_name); if (expr.is_optional()) { return nullptr; } PropertyValueCollector<decltype(expr)> collector(ctx); return std::make_unique<ProjectExpr<MLPropertyExpr<VertexColumn, int64_t>, decltype(collector)>>(std::move(expr), collector, alias); } case RTAnyType::kDate32: { auto expr = MLPropertyExpr<VertexColumn, Day>(graph, column, property_name); PropertyValueCollector<decltype(expr)> collector(ctx); if (expr.is_optional()) { return nullptr; } return std::make_unique< ProjectExpr<MLPropertyExpr<VertexColumn, Day>, decltype(collector)>>( std::move(expr), collector, alias); } case RTAnyType::kTimestamp: { auto expr = MLPropertyExpr<VertexColumn, Date>(graph, column, property_name); PropertyValueCollector<decltype(expr)> collector(ctx); if (expr.is_optional()) { return nullptr; } return std::make_unique< ProjectExpr<MLPropertyExpr<VertexColumn, Date>, decltype(collector)>>( std::move(expr), collector, alias); } default: LOG(INFO) << "not implemented - " << static_cast<int>(type); } return nullptr; } template <typename T> struct OptionalValueCollector { struct OptionalExprWrapper { using V = std::optional<T>; OptionalExprWrapper(Expr&& expr) : arena(std::make_shared<Arena>()), expr(std::move(expr)) {} inline std::optional<T> operator()(size_t idx) const { auto val = expr.eval_path(idx, *arena, 0); if (val.is_null()) { return std::nullopt; } return TypedConverter<T>::to_typed(val); } mutable std::shared_ptr<Arena> arena; Expr expr; }; using EXPR = OptionalExprWrapper; OptionalValueCollector(const Context& ctx, const EXPR& expr) : arena_(expr.arena) { builder.reserve(ctx.row_num()); } void collect(const EXPR& expr, size_t idx) { auto val = expr(idx); if (!val.has_value()) { builder.push_back_null(); } else { builder.push_back_opt(*val, true); } } auto get() { return builder.finish(arena_); } std::shared_ptr<Arena> arena_; OptionalValueColumnBuilder<T> builder; }; struct VertexExprWrapper { using V = VertexRecord; VertexExprWrapper(Expr&& expr) : expr(std::move(expr)) {} inline VertexRecord operator()(size_t idx) const { return expr.eval_path(idx, arena).as_vertex(); } mutable Arena arena; Expr expr; }; struct SLVertexCollector { using EXPR = VertexExprWrapper; SLVertexCollector(label_t v_label) : builder(SLVertexColumnBuilder::builder(v_label)) {} void collect(const EXPR& expr, size_t idx) { auto v = expr(idx); builder.push_back_opt(v.vid_); } auto get() { return builder.finish(nullptr); } SLVertexColumnBuilder builder; }; struct MLVertexCollector { using EXPR = VertexExprWrapper; MLVertexCollector() : builder(MLVertexColumnBuilder::builder()) {} void collect(const EXPR& expr, size_t idx) { auto v = expr(idx); builder.push_back_vertex(v); } auto get() { return builder.finish(nullptr); } MLVertexColumnBuilder builder; }; struct EdgeCollector { EdgeCollector() : builder(BDMLEdgeColumnBuilder::builder()) {} struct EdgeExprWrapper { using V = EdgeRecord; EdgeExprWrapper(Expr&& expr) : expr(std::move(expr)) {} inline EdgeRecord operator()(size_t idx) const { return expr.eval_path(idx, arena).as_edge(); } mutable Arena arena; Expr expr; }; using EXPR = EdgeExprWrapper; void collect(const EXPR& expr, size_t idx) { auto e = expr(idx); builder.push_back_opt(e); } auto get() { return builder.finish(nullptr); } BDMLEdgeColumnBuilder builder; }; struct ListCollector { struct ListExprWrapper { using V = List; ListExprWrapper(Expr&& expr) : arena(std::make_shared<Arena>()), expr(std::move(expr)) {} inline List operator()(size_t idx) const { return expr.eval_path(idx, *arena).as_list(); } mutable std::shared_ptr<Arena> arena; Expr expr; }; using EXPR = ListExprWrapper; ListCollector(const Context& ctx, const EXPR& expr) : builder_( std::make_shared<ListValueColumnBuilder>(expr.expr.elem_type())), arena_(expr.arena) {} void collect(const EXPR& expr, size_t idx) { builder_->push_back_opt(expr(idx)); } auto get() { return builder_->finish(arena_); } std::shared_ptr<ListValueColumnBuilder> builder_; std::shared_ptr<Arena> arena_; }; template <typename EXPR, typename RESULT_T> struct CaseWhenCollector { CaseWhenCollector(const Context& ctx) : ctx_(ctx) { builder.reserve(ctx.row_num()); } void collect(const EXPR& expr, size_t idx) { builder.push_back_opt(expr(idx)); } auto get() { return builder.finish(nullptr); } const Context& ctx_; ValueColumnBuilder<RESULT_T> builder; }; template <typename VERTEX_COL_PTR, typename SP_PRED_T, typename RESULT_T> struct SPOpr { using V = RESULT_T; SPOpr(const VERTEX_COL_PTR& vertex_col, SP_PRED_T&& pred, RESULT_T then_value, RESULT_T else_value) : vertex_col(vertex_col), pred(std::move(pred)), then_value(then_value), else_value(else_value) {} inline RESULT_T operator()(size_t idx) const { auto v = vertex_col->get_vertex(idx); if (pred(v.label_, v.vid_)) { return then_value; } else { return else_value; } } VERTEX_COL_PTR vertex_col; SP_PRED_T pred; RESULT_T then_value; RESULT_T else_value; }; template <typename PRED> std::unique_ptr<ProjectExprBase> create_case_when_project( const Context& ctx, const std::shared_ptr<IVertexColumn>& vertex_col, PRED&& pred, const common::Value& then_value, const common::Value& else_value, int alias) { if (then_value.item_case() != else_value.item_case()) { return nullptr; } switch (then_value.item_case()) { case common::Value::kI32: { if (vertex_col->vertex_column_type() == VertexColumnType::kSingle) { auto typed_vertex_col = std::dynamic_pointer_cast<SLVertexColumn>(vertex_col); SPOpr opr(typed_vertex_col, std::move(pred), then_value.i32(), else_value.i32()); auto collector = CaseWhenCollector<decltype(opr), int32_t>(ctx); return std::make_unique<ProjectExpr<decltype(opr), decltype(collector)>>( std::move(opr), collector, alias); } else { SPOpr opr(vertex_col, std::move(pred), then_value.i32(), else_value.i32()); auto collector = CaseWhenCollector<decltype(opr), int32_t>(ctx); return std::make_unique<ProjectExpr<decltype(opr), decltype(collector)>>( std::move(opr), collector, alias); } } case common::Value::kI64: { SPOpr opr(vertex_col, std::move(pred), then_value.i64(), else_value.i64()); auto collector = CaseWhenCollector<decltype(opr), int64_t>(ctx); return std::make_unique<ProjectExpr<decltype(opr), decltype(collector)>>( std::move(opr), collector, alias); } default: LOG(ERROR) << "Unsupported type for case when collector"; return nullptr; } } template <typename T> static std::unique_ptr<ProjectExprBase> _make_project_expr(Expr&& expr, int alias, const Context& ctx) { if (!expr.is_optional()) { typename ValueCollector<T>::EXPR wexpr(std::move(expr)); ValueCollector<T> collector(ctx, wexpr); return std::make_unique< ProjectExpr<typename ValueCollector<T>::EXPR, ValueCollector<T>>>( std::move(wexpr), collector, alias); } else { typename OptionalValueCollector<T>::EXPR wexpr(std::move(expr)); OptionalValueCollector<T> collector(ctx, wexpr); return std::make_unique<ProjectExpr< typename OptionalValueCollector<T>::EXPR, OptionalValueCollector<T>>>( std::move(wexpr), collector, alias); } } template <typename T> static std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)> _make_project_expr(const common::Expression& expr, int alias) { return [=](const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx) -> std::unique_ptr<ProjectExprBase> { Expr e(graph, ctx, params, expr, VarType::kPathVar); return _make_project_expr<T>(std::move(e), alias, ctx); }; } bool is_exchange_index(const common::Expression& expr, int alias, int& tag) { if (expr.operators().size() == 1 && expr.operators(0).item_case() == common::ExprOpr::kVar) { auto var = expr.operators(0).var(); tag = -1; if (var.has_property()) { return false; } if (var.has_tag()) { tag = var.tag().id(); } return true; } return false; } bool is_check_property_in_range(const common::Expression& expr, int& tag, std::string& name, std::string& lower, std::string& upper, common::Value& then_value, common::Value& else_value) { if (expr.operators_size() == 1 && expr.operators(0).item_case() == common::ExprOpr::kCase) { auto opr = expr.operators(0).case_(); if (opr.when_then_expressions_size() != 1) { return false; } auto when = opr.when_then_expressions(0).when_expression(); if (when.operators_size() != 7) { return false; } { if (!when.operators(0).has_var()) { return false; } auto var = when.operators(0).var(); if (!var.has_tag()) { return false; } tag = var.tag().id(); if (!var.has_property()) { return false; } if (!var.property().has_key()) { return false; } name = var.property().key().name(); if (name == "label") { return false; } } { auto op = when.operators(1); if (op.item_case() != common::ExprOpr::kLogical || op.logical() != common::GE) { return false; } } auto lower_param = when.operators(2); if (lower_param.item_case() != common::ExprOpr::kParam) { return false; } lower = lower_param.param().name(); { auto op = when.operators(3); if (op.item_case() != common::ExprOpr::kLogical || op.logical() != common::AND) { return false; } } { if (!when.operators(4).has_var()) { return false; } auto var = when.operators(4).var(); if (!var.has_tag()) { return false; } if (var.tag().id() != tag) { return false; } if (!var.has_property()) { return false; } if (!var.property().has_key() && name != var.property().key().name()) { return false; } } auto op = when.operators(5); if (op.item_case() != common::ExprOpr::kLogical || op.logical() != common::LT) { return false; } auto upper_param = when.operators(6); if (upper_param.item_case() != common::ExprOpr::kParam) { return false; } upper = upper_param.param().name(); auto then = opr.when_then_expressions(0).then_result_expression(); if (then.operators_size() != 1) { return false; } if (!then.operators(0).has_const_()) { return false; } then_value = then.operators(0).const_(); auto else_expr = opr.else_result_expression(); if (else_expr.operators_size() != 1) { return false; } if (!else_expr.operators(0).has_const_()) { return false; } else_value = else_expr.operators(0).const_(); if (then_value.item_case() != else_value.item_case()) { return false; } return true; } return false; } bool is_check_property_cmp(const common::Expression& expr, int& tag, std::string& name, std::string& target, common::Value& then_value, common::Value& else_value, SPPredicateType& ptype) { if (expr.operators_size() == 1 && expr.operators(0).item_case() == common::ExprOpr::kCase) { auto opr = expr.operators(0).case_(); if (opr.when_then_expressions_size() != 1) { return false; } auto when = opr.when_then_expressions(0).when_expression(); if (when.operators_size() != 3) { return false; } { if (!when.operators(0).has_var()) { return false; } auto var = when.operators(0).var(); if (!var.has_tag()) { return false; } tag = var.tag().id(); if (!var.has_property()) { return false; } if (!var.property().has_key()) { return false; } name = var.property().key().name(); if (name == "label") { return false; } } { auto op = when.operators(1); if (op.item_case() != common::ExprOpr::kLogical) { return false; } switch (op.logical()) { case common::LT: ptype = SPPredicateType::kPropertyLT; break; case common::LE: ptype = SPPredicateType::kPropertyLE; break; case common::GT: ptype = SPPredicateType::kPropertyGT; break; case common::GE: ptype = SPPredicateType::kPropertyGE; break; case common::EQ: ptype = SPPredicateType::kPropertyEQ; break; case common::NE: ptype = SPPredicateType::kPropertyNE; break; default: return false; } } auto upper_param = when.operators(2); if (upper_param.item_case() != common::ExprOpr::kParam) { return false; } target = upper_param.param().name(); auto then = opr.when_then_expressions(0).then_result_expression(); if (then.operators_size() != 1) { return false; } if (!then.operators(0).has_const_()) { return false; } then_value = then.operators(0).const_(); auto else_expr = opr.else_result_expression(); if (else_expr.operators_size() != 1) { return false; } if (!else_expr.operators(0).has_const_()) { return false; } else_value = else_expr.operators(0).const_(); if (then_value.item_case() != else_value.item_case()) { return false; } return true; } return false; } bool is_property_extract(const common::Expression& expr, int& tag, std::string& name, RTAnyType& type) { if (expr.operators_size() == 1 && expr.operators(0).item_case() == common::ExprOpr::kVar) { auto var = expr.operators(0).var(); tag = -1; if (!var.has_property()) { return false; } if (var.has_tag()) { tag = var.tag().id(); } if (var.has_property() && var.property().has_key()) { name = var.property().key().name(); if (name == "label") { return false; } if (var.has_node_type()) { type = parse_from_ir_data_type(var.node_type()); } else { return false; } if (type == RTAnyType::kUnknown) { return false; } // only support pod type if (type == RTAnyType::kTimestamp || type == RTAnyType::kDate32 || type == RTAnyType::kI64Value || type == RTAnyType::kI32Value) { return true; } } } return false; } template <typename T> static std::unique_ptr<ProjectExprBase> create_sp_pred_case_when( const Context& ctx, const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const std::shared_ptr<IVertexColumn>& vertex, SPPredicateType type, const std::string& name, const std::string& target, const common::Value& then_value, const common::Value& else_value, int alias) { if (type == SPPredicateType::kPropertyLT) { VertexPropertyLTPredicateBeta<T> pred(graph, name, params.at(target)); return create_case_when_project(ctx, vertex, std::move(pred), then_value, else_value, alias); } else if (type == SPPredicateType::kPropertyGT) { VertexPropertyGTPredicateBeta<T> pred(graph, name, params.at(target)); return create_case_when_project(ctx, vertex, std::move(pred), then_value, else_value, alias); } else if (type == SPPredicateType::kPropertyLE) { VertexPropertyLEPredicateBeta<T> pred(graph, name, params.at(target)); return create_case_when_project(ctx, vertex, std::move(pred), then_value, else_value, alias); } else if (type == SPPredicateType::kPropertyGE) { VertexPropertyGEPredicateBeta<T> pred(graph, name, params.at(target)); return create_case_when_project(ctx, vertex, std::move(pred), then_value, else_value, alias); } else if (type == SPPredicateType::kPropertyEQ) { VertexPropertyEQPredicateBeta<T> pred(graph, name, params.at(target)); return create_case_when_project(ctx, vertex, std::move(pred), then_value, else_value, alias); } else if (type == SPPredicateType::kPropertyNE) { VertexPropertyNEPredicateBeta<T> pred(graph, name, params.at(target)); return create_case_when_project(ctx, vertex, std::move(pred), then_value, else_value, alias); } return nullptr; } // in the case of data_type is not set, we need to infer the type from the // expr static std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)> make_project_expr(const common::Expression& expr, int alias) { return [=](const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx) -> std::unique_ptr<ProjectExprBase> { Expr e(graph, ctx, params, expr, VarType::kPathVar); switch (e.type()) { case RTAnyType::kI64Value: { return _make_project_expr<int64_t>(std::move(e), alias, ctx); } break; case RTAnyType::kStringValue: { return _make_project_expr<std::string_view>(std::move(e), alias, ctx); } break; case RTAnyType::kDate32: { return _make_project_expr<Day>(std::move(e), alias, ctx); } break; case RTAnyType::kTimestamp: { return _make_project_expr<Date>(std::move(e), alias, ctx); } break; case RTAnyType::kVertex: { MLVertexCollector collector; collector.builder.reserve(ctx.row_num()); return std::make_unique< ProjectExpr<typename MLVertexCollector::EXPR, MLVertexCollector>>( std::move(e), collector, alias); } break; case RTAnyType::kI32Value: { return _make_project_expr<int32_t>(std::move(e), alias, ctx); } break; case RTAnyType::kF64Value: { return _make_project_expr<double>(std::move(e), alias, ctx); } break; case RTAnyType::kEdge: { EdgeCollector collector; return std::make_unique< ProjectExpr<typename EdgeCollector::EXPR, EdgeCollector>>( std::move(e), collector, alias); } break; case RTAnyType::kTuple: { return _make_project_expr<Tuple>(std::move(e), alias, ctx); } break; case RTAnyType::kList: { ListCollector::EXPR expr(std::move(e)); ListCollector collector(ctx, expr); return std::make_unique< ProjectExpr<typename ListCollector::EXPR, ListCollector>>( std::move(expr), collector, alias); } break; case RTAnyType::kMap: { return _make_project_expr<Map>(std::move(e), alias, ctx); } break; default: LOG(FATAL) << "not support - " << static_cast<int>(e.type()); break; } return nullptr; }; } static std::optional<std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)>> parse_special_expr(const common::Expression& expr, int alias) { int tag = -1; if (is_exchange_index(expr, alias, tag)) { return [=](const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx) -> std::unique_ptr<ProjectExprBase> { return std::make_unique<DummyGetter>(tag, alias); }; } { int tag; std::string name; RTAnyType type; if (is_property_extract(expr, tag, name, type)) { return [=](const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx) -> std::unique_ptr<ProjectExprBase> { auto col = ctx.get(tag); if ((!col->is_optional()) && col->column_type() == ContextColumnType::kVertex) { auto vertex_col = std::dynamic_pointer_cast<IVertexColumn>(col); if (vertex_col->get_labels_set().size() == 1) { if (vertex_col->vertex_column_type() == VertexColumnType::kSingle) { auto typed_vertex_col = std::dynamic_pointer_cast<SLVertexColumn>(vertex_col); return create_sl_property_expr(ctx, graph, *typed_vertex_col, name, type, alias); } else { return create_sl_property_expr(ctx, graph, *vertex_col, name, type, alias); } } else { if (vertex_col->vertex_column_type() == VertexColumnType::kMultiple) { auto typed_vertex_col = std::dynamic_pointer_cast<MLVertexColumn>(vertex_col); return create_ml_property_expr(ctx, graph, *typed_vertex_col, name, type, alias); } else { auto typed_vertex_col = std::dynamic_pointer_cast<MSVertexColumn>(vertex_col); return create_ml_property_expr(ctx, graph, *typed_vertex_col, name, type, alias); } } } return make_project_expr(expr, alias)(graph, params, ctx); }; } } std::string name, lower, upper, target; common::Value then_value, else_value; if (is_check_property_in_range(expr, tag, name, lower, upper, then_value, else_value)) { return [=](const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx) -> std::unique_ptr<ProjectExprBase> { auto col = ctx.get(tag); if (col->column_type() == ContextColumnType::kVertex) { auto vertex_col = std::dynamic_pointer_cast<IVertexColumn>(col); auto type = expr.operators(0) .case_() .when_then_expressions(0) .when_expression() .operators(2) .param() .data_type(); auto type_ = parse_from_ir_data_type(type); if (then_value.item_case() != else_value.item_case() || then_value.item_case() != common::Value::kI32) { return make_project_expr(expr, alias)(graph, params, ctx); } if (type_ == RTAnyType::kI32Value) { SPOpr sp(vertex_col, VertexPropertyBetweenPredicateBeta<int32_t>( graph, name, params.at(lower), params.at(upper)), then_value.i32(), else_value.i32()); CaseWhenCollector<decltype(sp), int32_t> collector(ctx); return std::make_unique< ProjectExpr<decltype(sp), decltype(collector)>>(std::move(sp), collector, alias); } else if (type_ == RTAnyType::kI64Value) { SPOpr sp(vertex_col, VertexPropertyBetweenPredicateBeta<int64_t>( graph, name, params.at(lower), params.at(upper)), then_value.i32(), else_value.i32()); CaseWhenCollector<decltype(sp), int32_t> collector(ctx); return std::make_unique< ProjectExpr<decltype(sp), decltype(collector)>>(std::move(sp), collector, alias); } else if (type_ == RTAnyType::kTimestamp) { if (vertex_col->vertex_column_type() == VertexColumnType::kSingle) { auto typed_vertex_col = std::dynamic_pointer_cast<SLVertexColumn>(vertex_col); SPOpr sp(typed_vertex_col, VertexPropertyBetweenPredicateBeta<Date>( graph, name, params.at(lower), params.at(upper)), then_value.i32(), else_value.i32()); CaseWhenCollector<decltype(sp), int32_t> collector(ctx); return std::make_unique< ProjectExpr<decltype(sp), decltype(collector)>>( std::move(sp), collector, alias); } else { SPOpr sp(vertex_col, VertexPropertyBetweenPredicateBeta<Date>( graph, name, params.at(lower), params.at(upper)), then_value.i32(), else_value.i32()); CaseWhenCollector<decltype(sp), int32_t> collector(ctx); return std::make_unique< ProjectExpr<decltype(sp), decltype(collector)>>( std::move(sp), collector, alias); } } } return make_project_expr(expr, alias)(graph, params, ctx); }; } SPPredicateType ptype; if (is_check_property_cmp(expr, tag, name, target, then_value, else_value, ptype)) { return [=](const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx) -> std::unique_ptr<ProjectExprBase> { auto col = ctx.get(tag); if (col->column_type() == ContextColumnType::kVertex) { auto vertex_col = std::dynamic_pointer_cast<IVertexColumn>(col); auto type = expr.operators(0) .case_() .when_then_expressions(0) .when_expression() .operators(2) .param() .data_type(); auto type_ = parse_from_ir_data_type(type); if (type_ == RTAnyType::kI32Value) { auto ptr = create_sp_pred_case_when<int32_t>( ctx, graph, params, vertex_col, ptype, name, target, then_value, else_value, alias); if (ptr) { return ptr; } } else if (type_ == RTAnyType::kI64Value) { auto ptr = create_sp_pred_case_when<int64_t>( ctx, graph, params, vertex_col, ptype, name, target, then_value, else_value, alias); if (ptr) { return ptr; } } else if (type_ == RTAnyType::kTimestamp) { auto ptr = create_sp_pred_case_when<Date>( ctx, graph, params, vertex_col, ptype, name, target, then_value, else_value, alias); if (ptr) { return ptr; } } else if (type_ == RTAnyType::kStringValue) { auto ptr = create_sp_pred_case_when<std::string_view>( ctx, graph, params, vertex_col, ptype, name, target, then_value, else_value, alias); if (ptr) { return ptr; } } } return make_project_expr(expr, alias)(graph, params, ctx); }; } return std::nullopt; } std::optional<std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)>> make_project_expr(const common::Expression& expr, const common::IrDataType& data_type, int alias) { switch (data_type.type_case()) { case common::IrDataType::kDataType: { auto type = parse_from_ir_data_type(data_type); switch (type) { case RTAnyType::kI64Value: { return _make_project_expr<int64_t>(expr, alias); } break; case RTAnyType::kI32Value: { return _make_project_expr<int32_t>(expr, alias); } break; case RTAnyType::kF64Value: { return _make_project_expr<double>(expr, alias); } break; case RTAnyType::kBoolValue: { return _make_project_expr<bool>(expr, alias); } break; case RTAnyType::kStringValue: { return _make_project_expr<std::string_view>(expr, alias); } break; case RTAnyType::kTimestamp: { return _make_project_expr<Date>(expr, alias); } break; case RTAnyType::kDate32: { return _make_project_expr<Day>(expr, alias); } break; // compiler bug here case RTAnyType::kUnknown: { return make_project_expr(expr, alias); } break; default: { LOG(INFO) << "not support" << data_type.DebugString(); return std::nullopt; } } } case common::IrDataType::kGraphType: { const common::GraphDataType& graph_data_type = data_type.graph_type(); common::GraphDataType_GraphElementOpt elem_opt = graph_data_type.element_opt(); int label_num = graph_data_type.graph_data_type_size(); if (elem_opt == common::GraphDataType_GraphElementOpt:: GraphDataType_GraphElementOpt_VERTEX) { if (label_num == 1) { label_t v_label = static_cast<label_t>( graph_data_type.graph_data_type(0).label().label()); return [=](const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx) -> std::unique_ptr<ProjectExprBase> { Expr e(graph, ctx, params, expr, VarType::kPathVar); SLVertexCollector collector(v_label); collector.builder.reserve(ctx.row_num()); return std::make_unique< ProjectExpr<typename SLVertexCollector::EXPR, SLVertexCollector>>( std::move(e), collector, alias); }; } else if (label_num > 1) { return [=](const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx) -> std::unique_ptr<ProjectExprBase> { Expr e(graph, ctx, params, expr, VarType::kPathVar); MLVertexCollector collector; collector.builder.reserve(ctx.row_num()); return std::make_unique< ProjectExpr<typename MLVertexCollector::EXPR, MLVertexCollector>>( std::move(e), collector, alias); }; } else { LOG(INFO) << "unexpected type"; } } else if (elem_opt == common::GraphDataType_GraphElementOpt:: GraphDataType_GraphElementOpt_EDGE) { return [=](const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx) -> std::unique_ptr<ProjectExprBase> { Expr e(graph, ctx, params, expr, VarType::kPathVar); EdgeCollector collector; return std::make_unique< ProjectExpr<typename EdgeCollector::EXPR, EdgeCollector>>( std::move(e), collector, alias); }; } else { LOG(INFO) << "unexpected type"; } } break; case common::IrDataType::TYPE_NOT_SET: { return make_project_expr(expr, alias); } break; default: LOG(INFO) << "unexpected type" << data_type.DebugString(); break; } return std::nullopt; } class ProjectOpr : public IReadOperator { public: ProjectOpr(const std::vector<std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)>>& exprs, const std::vector<std::pair<int, std::set<int>>>& dependencies, bool is_append) : exprs_(exprs), dependencies_(dependencies), is_append_(is_append) {} bl::result<gs::runtime::Context> Eval( const gs::runtime::GraphReadInterface& graph, const std::map<std::string, std::string>& params, gs::runtime::Context&& ctx, gs::runtime::OprTimer& timer) override { std::vector<std::unique_ptr<ProjectExprBase>> exprs; std::vector<std::shared_ptr<Arena>> arenas; if (!dependencies_.empty()) { arenas.resize(ctx.col_num(), nullptr); for (size_t i = 0; i < ctx.col_num(); ++i) { if (ctx.get(i)) { arenas[i] = ctx.get(i)->get_arena(); } } } for (size_t i = 0; i < exprs_.size(); ++i) { exprs.push_back(exprs_[i](graph, params, ctx)); } auto ret = Project::project(std::move(ctx), exprs, is_append_); if (!ret) { return ret; } for (auto& [idx, deps] : dependencies_) { std::shared_ptr<Arena> arena = std::make_shared<Arena>(); auto arena1 = ret.value().get(idx)->get_arena(); if (arena1) { arena->emplace_back(std::make_unique<ArenaRef>(arena1)); } for (auto& dep : deps) { if (arenas[dep]) { arena->emplace_back(std::make_unique<ArenaRef>(arenas[dep])); } } ret.value().get(idx)->set_arena(arena); } return ret; } std::string get_operator_name() const override { return "ProjectOpr"; } private: std::vector<std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)>> exprs_; std::vector<std::pair<int, std::set<int>>> dependencies_; bool is_append_; }; auto _make_project_expr(const common::Expression& expr, int alias, const std::optional<common::IrDataType>& data_type) { auto func = parse_special_expr(expr, alias); if (func.has_value()) { return func.value(); } if (data_type.has_value() && data_type.value().type_case() != common::IrDataType::TYPE_NOT_SET) { auto func = make_project_expr(expr, data_type.value(), alias); if (func.has_value()) { return func.value(); } } return make_project_expr(expr, alias); } bool check_identities(const common::Variable& var, int& tag) { if (var.has_property()) { return false; } tag = var.has_tag() ? var.tag().id() : -1; return false; } void parse_potential_dependencies(const common::Expression& expr, std::set<int>& dependencies) { if (expr.operators_size() > 1) { return; } if (expr.operators(0).item_case() == common::ExprOpr::kVar) { auto var = expr.operators(0).var(); int tag; if (check_identities(var, tag)) { dependencies.insert(tag); } } else if (expr.operators(0).item_case() == common::ExprOpr::kVars) { int len = expr.operators(0).vars().keys_size(); for (int i = 0; i < len; ++i) { auto var = expr.operators(0).vars().keys(i); int tag; if (check_identities(var, tag)) { dependencies.insert(tag); } } } else if (expr.operators(0).item_case() == common::ExprOpr::kCase) { auto opr = expr.operators(0).case_(); int when_size = opr.when_then_expressions_size(); for (int i = 0; i < when_size; ++i) { auto when = opr.when_then_expressions(i).when_expression(); if (when.operators_size() == 1) { parse_potential_dependencies(when, dependencies); } } auto else_expr = opr.else_result_expression(); if (else_expr.operators_size() == 1) { parse_potential_dependencies(else_expr, dependencies); } } } bl::result<ReadOpBuildResultT> ProjectOprBuilder::Build( const gs::Schema& schema, const ContextMeta& ctx_meta, const physical::PhysicalPlan& plan, int op_idx) { std::vector<common::IrDataType> data_types; int mappings_size = plan.plan(op_idx).opr().project().mappings_size(); std::vector<std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)>> exprs; ContextMeta ret_meta; bool is_append = plan.plan(op_idx).opr().project().is_append(); if (is_append) { ret_meta = ctx_meta; } std::vector<std::pair<int, std::set<int>>> dependencies; if (plan.plan(op_idx).meta_data_size() == mappings_size) { for (int i = 0; i < plan.plan(op_idx).meta_data_size(); ++i) { data_types.push_back(plan.plan(op_idx).meta_data(i).type()); const auto& m = plan.plan(op_idx).opr().project().mappings(i); int alias = m.has_alias() ? m.alias().value() : -1; ret_meta.set(alias); if (!m.has_expr()) { LOG(ERROR) << "expr is not set" << m.DebugString(); return std::make_pair(nullptr, ret_meta); } auto expr = m.expr(); std::set<int> dependencies_set; parse_potential_dependencies(expr, dependencies_set); if (!dependencies_set.empty()) { dependencies.emplace_back(alias, dependencies_set); } exprs.emplace_back(_make_project_expr(expr, alias, data_types[i])); } } else { for (int i = 0; i < mappings_size; ++i) { auto& m = plan.plan(op_idx).opr().project().mappings(i); int alias = m.has_alias() ? m.alias().value() : -1; ret_meta.set(alias); if (!m.has_expr()) { LOG(ERROR) << "expr is not set" << m.DebugString(); return std::make_pair(nullptr, ret_meta); } auto expr = m.expr(); std::set<int> dependencies_set; parse_potential_dependencies(expr, dependencies_set); if (!dependencies_set.empty()) { dependencies.emplace_back(alias, dependencies_set); } exprs.emplace_back(_make_project_expr(expr, alias, std::nullopt)); } } return std::make_pair( std::make_unique<ProjectOpr>(std::move(exprs), dependencies, is_append), ret_meta); } class ProjectOrderByOprBeta : public IReadOperator { public: ProjectOrderByOprBeta( const std::vector<std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)>>& exprs, const std::vector<std::pair<int, std::set<int>>>& dependencies, const std::set<int>& order_by_keys, const std::vector<std::pair<common::Variable, bool>>& order_by_pairs, int lower_bound, int upper_bound, const std::tuple<int, int, bool>& first_pair) : exprs_(exprs), dependencies_(dependencies), order_by_keys_(order_by_keys), order_by_pairs_(order_by_pairs), lower_bound_(lower_bound), upper_bound_(upper_bound), first_pair_(first_pair) {} std::string get_operator_name() const override { return "ProjectOrderByOprBeta"; } bl::result<gs::runtime::Context> Eval( const gs::runtime::GraphReadInterface& graph, const std::map<std::string, std::string>& params, gs::runtime::Context&& ctx, gs::runtime::OprTimer& timer) override { std::vector<std::shared_ptr<Arena>> arenas; if (!dependencies_.empty()) { arenas.resize(ctx.col_num(), nullptr); for (size_t i = 0; i < ctx.col_num(); ++i) { if (ctx.get(i)) { arenas[i] = ctx.get(i)->get_arena(); } } } auto cmp_func = [&](const Context& ctx) -> GeneralComparer { GeneralComparer cmp; for (const auto& pair : order_by_pairs_) { Var v(graph, ctx, pair.first, VarType::kPathVar); cmp.add_keys(std::move(v), pair.second); } return cmp; }; auto ret = Project::project_order_by_fuse<GeneralComparer>( graph, params, std::move(ctx), exprs_, cmp_func, lower_bound_, upper_bound_, order_by_keys_, first_pair_); if (!ret) { return ret; } for (auto& [idx, deps] : dependencies_) { std::shared_ptr<Arena> arena = std::make_shared<Arena>(); auto arena1 = ret.value().get(idx)->get_arena(); if (arena1) { arena->emplace_back(std::make_unique<ArenaRef>(arena1)); } for (auto& dep : deps) { if (arenas[dep]) { arena->emplace_back(std::make_unique<ArenaRef>(arenas[dep])); } } ret.value().get(idx)->set_arena(arena); } return ret; } private: std::vector<std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)>> exprs_; std::vector<std::pair<int, std::set<int>>> dependencies_; std::set<int> order_by_keys_; std::vector<std::pair<common::Variable, bool>> order_by_pairs_; int lower_bound_, upper_bound_; std::tuple<int, int, bool> first_pair_; }; static bool project_order_by_fusable_beta( const physical::Project& project_opr, const algebra::OrderBy& order_by_opr, const ContextMeta& ctx_meta, const std::vector<common::IrDataType>& data_types, std::set<int>& order_by_keys) { if (!order_by_opr.has_limit()) { return false; } if (project_opr.is_append()) { return false; } int mappings_size = project_opr.mappings_size(); if (static_cast<size_t>(mappings_size) != data_types.size()) { return false; } std::set<int> new_generate_columns; for (int i = 0; i < mappings_size; ++i) { const physical::Project_ExprAlias& m = project_opr.mappings(i); if (m.has_alias()) { int alias = m.alias().value(); if (ctx_meta.exist(alias)) { return false; } if (new_generate_columns.find(alias) != new_generate_columns.end()) { return false; } new_generate_columns.insert(alias); } } int order_by_keys_num = order_by_opr.pairs_size(); for (int k_i = 0; k_i < order_by_keys_num; ++k_i) { if (!order_by_opr.pairs(k_i).has_key()) { return false; } if (!order_by_opr.pairs(k_i).key().has_tag()) { return false; } if (!(order_by_opr.pairs(k_i).key().tag().item_case() == common::NameOrId::ItemCase::kId)) { return false; } order_by_keys.insert(order_by_opr.pairs(k_i).key().tag().id()); } if (data_types.size() == order_by_keys.size()) { return false; } for (auto key : order_by_keys) { if (new_generate_columns.find(key) == new_generate_columns.end() && !ctx_meta.exist(key)) { return false; } } return true; } bl::result<ReadOpBuildResultT> ProjectOrderByOprBuilder::Build( const gs::Schema& schema, const ContextMeta& ctx_meta, const physical::PhysicalPlan& plan, int op_idx) { std::vector<common::IrDataType> data_types; int mappings_size = plan.plan(op_idx).opr().project().mappings_size(); if (plan.plan(op_idx).meta_data_size() == mappings_size) { for (int i = 0; i < plan.plan(op_idx).meta_data_size(); ++i) { data_types.push_back(plan.plan(op_idx).meta_data(i).type()); } } std::set<int> order_by_keys; if (project_order_by_fusable_beta(plan.plan(op_idx).opr().project(), plan.plan(op_idx + 1).opr().order_by(), ctx_meta, data_types, order_by_keys)) { ContextMeta ret_meta; std::vector<std::function<std::unique_ptr<ProjectExprBase>( const GraphReadInterface& graph, const std::map<std::string, std::string>& params, const Context& ctx)>> exprs; std::set<int> index_set; int first_key = plan.plan(op_idx + 1).opr().order_by().pairs(0).key().tag().id(); int first_idx = -1; std::vector<std::pair<int, std::set<int>>> dependencies; for (int i = 0; i < mappings_size; ++i) { auto& m = plan.plan(op_idx).opr().project().mappings(i); int alias = -1; if (m.has_alias()) { alias = m.alias().value(); } ret_meta.set(alias); if (alias == first_key) { first_idx = i; } if (!m.has_expr()) { LOG(ERROR) << "expr is not set" << m.DebugString(); return std::make_pair(nullptr, ret_meta); } auto expr = m.expr(); std::set<int> dependencies_set; parse_potential_dependencies(expr, dependencies_set); if (!dependencies_set.empty()) { dependencies.emplace_back(alias, dependencies_set); } exprs.emplace_back(_make_project_expr(expr, alias, data_types[i])); if (order_by_keys.find(alias) != order_by_keys.end()) { index_set.insert(i); } } auto order_by_opr = plan.plan(op_idx + 1).opr().order_by(); int pair_size = order_by_opr.pairs_size(); std::vector<std::pair<common::Variable, bool>> order_by_pairs; std::tuple<int, int, bool> first_tuple; for (int i = 0; i < pair_size; ++i) { const auto& pair = order_by_opr.pairs(i); if (pair.order() != algebra::OrderBy_OrderingPair_Order:: OrderBy_OrderingPair_Order_ASC && pair.order() != algebra::OrderBy_OrderingPair_Order:: OrderBy_OrderingPair_Order_DESC) { LOG(ERROR) << "order by order is not set" << pair.DebugString(); return std::make_pair(nullptr, ContextMeta()); } bool asc = pair.order() == algebra::OrderBy_OrderingPair_Order::OrderBy_OrderingPair_Order_ASC; order_by_pairs.emplace_back(pair.key(), asc); if (i == 0) { first_tuple = std::make_tuple(first_key, first_idx, asc); if (pair.key().has_property()) { LOG(ERROR) << "key has property" << pair.DebugString(); return std::make_pair(nullptr, ContextMeta()); } } } int lower = 0; int upper = std::numeric_limits<int>::max(); if (order_by_opr.has_limit()) { lower = order_by_opr.limit().lower(); upper = order_by_opr.limit().upper(); } return std::make_pair(std::make_unique<ProjectOrderByOprBeta>( std::move(exprs), dependencies, index_set, order_by_pairs, lower, upper, first_tuple), ret_meta); } else { return std::make_pair(nullptr, ContextMeta()); } } } // namespace ops } // namespace runtime } // namespace gs