flex/engines/graph_db/runtime/execute/ops/update/project.cc (292 lines of code) (raw):

/** Copyright 2020 Alibaba Group Holding Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "flex/engines/graph_db/runtime/execute/ops/update/project.h" #include "flex/engines/graph_db/runtime/common/operators/update/project.h" #include "flex/engines/graph_db/runtime/utils/var.h" namespace gs { namespace runtime { namespace ops { class ProjectInsertOpr : public IInsertOperator { public: ProjectInsertOpr( const std::vector<std::function<std::unique_ptr<WriteProjectExprBase>( const std::map<std::string, std::string>&)>>& exprs) : exprs_(exprs) {} std::string get_operator_name() const override { return "ProjectInsertOpr"; } template <typename GraphInterface> bl::result<gs::runtime::WriteContext> eval_impl( GraphInterface& graph, const std::map<std::string, std::string>& params, gs::runtime::WriteContext&& ctx, gs::runtime::OprTimer& timer) { std::vector<std::unique_ptr<WriteProjectExprBase>> exprs; for (auto& expr : exprs_) { exprs.push_back(expr(params)); } return Project::project(std::move(ctx), exprs); } bl::result<gs::runtime::WriteContext> Eval( gs::runtime::GraphInsertInterface& graph, const std::map<std::string, std::string>& params, gs::runtime::WriteContext&& ctx, gs::runtime::OprTimer& timer) override { std::vector<std::unique_ptr<WriteProjectExprBase>> exprs; return eval_impl(graph, params, std::move(ctx), timer); } bl::result<gs::runtime::WriteContext> Eval( gs::runtime::GraphUpdateInterface& graph, const std::map<std::string, std::string>& params, gs::runtime::WriteContext&& ctx, gs::runtime::OprTimer& timer) override { std::vector<std::unique_ptr<WriteProjectExprBase>> exprs; return eval_impl(graph, params, std::move(ctx), timer); } private: std::vector<std::function<std::unique_ptr<WriteProjectExprBase>( const std::map<std::string, std::string>&)>> exprs_; }; std::unique_ptr<IInsertOperator> ProjectInsertOprBuilder::Build( const Schema& schema, const physical::PhysicalPlan& plan, int op_id) { auto opr = plan.plan(op_id).opr().project(); int mappings_size = opr.mappings_size(); std::vector<std::function<std::unique_ptr<WriteProjectExprBase>( const std::map<std::string, std::string>&)>> exprs; for (int i = 0; i < mappings_size; ++i) { const physical::Project_ExprAlias& m = opr.mappings(i); if (!m.has_alias()) { LOG(ERROR) << "project mapping should have alias"; return nullptr; } if ((!m.has_expr()) || m.expr().operators_size() != 1) { LOG(ERROR) << "project mapping should have one expr"; return nullptr; } if (m.expr().operators(0).item_case() == common::ExprOpr::kParam) { auto param = m.expr().operators(0).param(); auto name = param.name(); int alias = m.alias().value(); exprs.emplace_back( [name, alias](const std::map<std::string, std::string>& params) { CHECK(params.find(name) != params.end()); return std::make_unique<ParamsGetter>(params.at(name), alias); }); } else if (m.expr().operators(0).item_case() == common::ExprOpr::kVar) { auto var = m.expr().operators(0).var(); if (!var.has_tag()) { LOG(ERROR) << "project mapping should have tag"; return nullptr; } if (var.has_property()) { LOG(ERROR) << "project mapping should not have property"; return nullptr; } int tag = var.tag().id(); int alias = m.alias().value(); exprs.emplace_back( [tag, alias](const std::map<std::string, std::string>&) { return std::make_unique<DummyWGetter>(tag, alias); }); } else if (m.expr().operators(0).has_udf_func()) { auto udf_func = m.expr().operators(0).udf_func(); if (udf_func.name() == "gs.function.first") { if (udf_func.parameters_size() != 1 || udf_func.parameters(0).operators_size() != 1) { LOG(ERROR) << "not support for " << m.expr().DebugString(); return nullptr; } auto param = udf_func.parameters(0).operators(0); if (param.item_case() != common::ExprOpr::kVar) { LOG(ERROR) << "not support for " << m.expr().DebugString(); return nullptr; } auto var = param.var(); if (!var.has_tag()) { LOG(ERROR) << "project mapping should have tag"; return nullptr; } if (var.has_property()) { LOG(ERROR) << "project mapping should not have property"; return nullptr; } int tag = var.tag().id(); int alias = m.alias().value(); if (i + 1 < mappings_size) { const physical::Project_ExprAlias& next = opr.mappings(i + 1); if (!next.has_alias()) { LOG(ERROR) << "project mapping should have alias"; return nullptr; } if (!next.has_expr()) { LOG(ERROR) << "project mapping should have expr"; return nullptr; } if (next.expr().operators_size() != 1) { LOG(ERROR) << "project mapping should have one expr"; return nullptr; } if (next.expr().operators(0).has_udf_func()) { auto next_udf_func = next.expr().operators(0).udf_func(); if (next_udf_func.name() == "gs.function.second") { auto param = udf_func.parameters(0).operators(0); if (param.item_case() != common::ExprOpr::kVar) { LOG(ERROR) << "not support for " << m.expr().DebugString(); return nullptr; } auto var = param.var(); if (!var.has_tag()) { LOG(ERROR) << "project mapping should have tag"; return nullptr; } if (var.has_property()) { LOG(ERROR) << "project mapping should not have property"; return nullptr; } int next_tag = var.tag().id(); int next_alias = next.alias().value(); if (next_tag == tag) { exprs.emplace_back( [tag, alias, next_alias](const std::map<std::string, std::string>&) { return std::make_unique<PairsGetter>(tag, alias, next_alias); }); ++i; continue; } } } } exprs.emplace_back( [tag, alias](const std::map<std::string, std::string>&) { return std::make_unique<PairsFstGetter>(tag, alias); }); } else if (udf_func.name() == "gs.function.second") { if (udf_func.parameters_size() != 1 || udf_func.parameters(0).operators_size() != 1) { LOG(ERROR) << "not support for " << m.expr().DebugString(); return nullptr; } auto param = udf_func.parameters(0).operators(0); if (param.item_case() != common::ExprOpr::kVar) { LOG(ERROR) << "not support for " << m.expr().DebugString(); return nullptr; } auto var = param.var(); if (!var.has_tag()) { LOG(ERROR) << "project mapping should have tag"; return nullptr; } if (var.has_property()) { LOG(ERROR) << "project mapping should not have property"; return nullptr; } int tag = var.tag().id(); int alias = m.alias().value(); exprs.emplace_back( [tag, alias](const std::map<std::string, std::string>&) { return std::make_unique<PairsSndGetter>(tag, alias); }); } else { LOG(ERROR) << "not support for " << m.expr().DebugString(); return nullptr; } } else { LOG(ERROR) << "not support for " << m.expr().DebugString(); return nullptr; } } return std::make_unique<ProjectInsertOpr>(exprs); } template <typename EXPR, typename T> struct ValueCollector { ValueCollector(const Context& ctx) : ctx_(ctx) {} void collect(const EXPR& e, int i) { builder.push_back_opt(e(i)); } std::shared_ptr<IContextColumn> get() { return builder.finish(nullptr); } const Context& ctx_; ValueColumnBuilder<T> builder; }; template <typename T> struct TypedVar { TypedVar(Var&& var) : var(std::move(var)) {} T operator()(int i) const { return TypedConverter<T>::to_typed(var.get(i)); } Var var; }; class ProjectUpdateOpr : public IUpdateOperator { public: ProjectUpdateOpr(std::vector<std::pair<common::Expression, int>>&& mappings, bool is_append) : mappings_(std::move(mappings)), is_append_(is_append) {} std::string get_operator_name() const override { return "ProjectUpdateOpr"; } bl::result<gs::runtime::Context> Eval( gs::runtime::GraphUpdateInterface& graph, const std::map<std::string, std::string>& params, gs::runtime::Context&& ctx, gs::runtime::OprTimer& timer) override { std::vector<std::unique_ptr<UProjectExprBase>> exprs; for (auto& [map, alias] : mappings_) { if (map.operators_size() == 1 && map.operators(0).item_case() == common::ExprOpr::kVar) { auto var = map.operators(0).var(); if (!var.has_property()) { exprs.emplace_back( std::make_unique<UDummyGetter>(var.tag().id(), alias)); } else { Var var_(const_cast<const GraphUpdateInterface&>(graph), ctx, var, VarType::kPathVar); if (var_.type() == RTAnyType::kI64Value) { TypedVar<int64_t> getter(std::move(var_)); ValueCollector<TypedVar<int64_t>, int64_t> collector(ctx); exprs.emplace_back( std::make_unique< UProjectExpr<TypedVar<int64_t>, ValueCollector<TypedVar<int64_t>, int64_t>>>( std::move(getter), std::move(collector), alias)); } else if (var_.type() == RTAnyType::kStringValue) { TypedVar<std::string_view> getter(std::move(var_)); ValueCollector<TypedVar<std::string_view>, std::string_view> collector(ctx); exprs.emplace_back( std::make_unique< UProjectExpr<TypedVar<std::string_view>, ValueCollector<TypedVar<std::string_view>, std::string_view>>>( std::move(getter), std::move(collector), alias)); } else if (var_.type() == RTAnyType::kI32Value) { TypedVar<int32_t> getter(std::move(var_)); ValueCollector<TypedVar<int32_t>, int32_t> collector(ctx); exprs.emplace_back( std::make_unique< UProjectExpr<TypedVar<int32_t>, ValueCollector<TypedVar<int32_t>, int32_t>>>( std::move(getter), std::move(collector), alias)); } else { LOG(ERROR) << "project update only support var"; RETURN_BAD_REQUEST_ERROR("project update only support var"); } } } else { LOG(ERROR) << "project update only support var"; RETURN_BAD_REQUEST_ERROR("project update only support var"); } } return UProject::project(std::move(ctx), exprs, is_append_); } private: std::vector<std::pair<common::Expression, int>> mappings_; bool is_append_; }; std::unique_ptr<IUpdateOperator> UProjectOprBuilder::Build( const Schema& schema, const physical::PhysicalPlan& plan, int op_id) { auto project = plan.plan(op_id).opr().project(); bool is_append = project.is_append(); int mappings_size = project.mappings_size(); std::vector<std::pair<common::Expression, int>> mappings; for (int i = 0; i < mappings_size; ++i) { auto mapping = project.mappings(i); int alias = mapping.has_alias() ? mapping.alias().value() : -1; if (!mapping.has_expr()) { LOG(ERROR) << "project mapping should have expr"; return nullptr; } mappings.emplace_back(common::Expression(mapping.expr()), alias); } return std::make_unique<ProjectUpdateOpr>(std::move(mappings), is_append); } } // namespace ops } // namespace runtime } // namespace gs