flex/engines/graph_db/runtime/execute/plan_parser.cc (297 lines of code) (raw):
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flex/engines/graph_db/runtime/execute/plan_parser.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/dedup.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/edge.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/group_by.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/intersect.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/join.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/limit.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/order_by.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/path.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/procedure_call.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/project.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/scan.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/select.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/sink.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/unfold.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/union.h"
#include "flex/engines/graph_db/runtime/execute/ops/retrieve/vertex.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/dedup.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/load.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/project.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/sink.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/unfold.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/edge.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/scan.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/select.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/set.h"
#include "flex/engines/graph_db/runtime/execute/ops/update/vertex.h"
namespace gs {
namespace runtime {
void PlanParser::init() {
register_read_operator_builder(std::make_unique<ops::ScanOprBuilder>());
register_read_operator_builder(std::make_unique<ops::TCOprBuilder>());
register_read_operator_builder(
std::make_unique<ops::EdgeExpandGetVOprBuilder>());
register_read_operator_builder(std::make_unique<ops::EdgeExpandOprBuilder>());
register_read_operator_builder(std::make_unique<ops::VertexOprBuilder>());
register_read_operator_builder(
std::make_unique<ops::ProjectOrderByOprBuilder>());
register_read_operator_builder(std::make_unique<ops::ProjectOprBuilder>());
register_read_operator_builder(std::make_unique<ops::OrderByOprBuilder>());
register_read_operator_builder(std::make_unique<ops::GroupByOprBuilder>());
register_read_operator_builder(std::make_unique<ops::DedupOprBuilder>());
register_read_operator_builder(std::make_unique<ops::SelectOprBuilder>());
register_read_operator_builder(
std::make_unique<ops::SPOrderByLimitOprBuilder>());
register_read_operator_builder(std::make_unique<ops::SPOprBuilder>());
register_read_operator_builder(
std::make_unique<ops::PathExpandVOprBuilder>());
register_read_operator_builder(std::make_unique<ops::PathExpandOprBuilder>());
register_read_operator_builder(std::make_unique<ops::JoinOprBuilder>());
register_read_operator_builder(std::make_unique<ops::IntersectOprBuilder>());
register_read_operator_builder(std::make_unique<ops::LimitOprBuilder>());
register_read_operator_builder(std::make_unique<ops::UnfoldOprBuilder>());
register_read_operator_builder(std::make_unique<ops::UnionOprBuilder>());
register_read_operator_builder(std::make_unique<ops::SinkOprBuilder>());
register_read_operator_builder(
std::make_unique<ops::ProcedureCallOprBuilder>());
register_write_operator_builder(std::make_unique<ops::LoadOprBuilder>());
register_write_operator_builder(
std::make_unique<ops::DedupInsertOprBuilder>());
register_write_operator_builder(
std::make_unique<ops::ProjectInsertOprBuilder>());
register_write_operator_builder(
std::make_unique<ops::SinkInsertOprBuilder>());
register_write_operator_builder(
std::make_unique<ops::UnfoldInsertOprBuilder>());
register_update_operator_builder(
std::make_unique<ops::UEdgeExpandOprBuilder>());
register_update_operator_builder(std::make_unique<ops::UScanOprBuilder>());
register_update_operator_builder(std::make_unique<ops::USetOprBuilder>());
register_update_operator_builder(std::make_unique<ops::UVertexOprBuilder>());
register_update_operator_builder(std::make_unique<ops::USinkOprBuilder>());
register_update_operator_builder(std::make_unique<ops::UProjectOprBuilder>());
register_update_operator_builder(std::make_unique<ops::USelectOprBuilder>());
}
PlanParser& PlanParser::get() {
static PlanParser parser;
return parser;
}
void PlanParser::register_read_operator_builder(
std::unique_ptr<IReadOperatorBuilder>&& builder) {
auto ops = builder->GetOpKinds();
read_op_builders_[*ops.begin()].emplace_back(ops, std::move(builder));
}
void PlanParser::register_write_operator_builder(
std::unique_ptr<IInsertOperatorBuilder>&& builder) {
auto op = builder->GetOpKind();
write_op_builders_[op] = std::move(builder);
}
void PlanParser::register_update_operator_builder(
std::unique_ptr<IUpdateOperatorBuilder>&& builder) {
auto op = builder->GetOpKind();
update_op_builders_[op] = std::move(builder);
}
#if 1
static std::string get_opr_name(
physical::PhysicalOpr_Operator::OpKindCase op_kind) {
switch (op_kind) {
case physical::PhysicalOpr_Operator::OpKindCase::kScan: {
return "scan";
}
case physical::PhysicalOpr_Operator::OpKindCase::kEdge: {
return "edge_expand";
}
case physical::PhysicalOpr_Operator::OpKindCase::kVertex: {
return "get_v";
}
case physical::PhysicalOpr_Operator::OpKindCase::kOrderBy: {
return "order_by";
}
case physical::PhysicalOpr_Operator::OpKindCase::kProject: {
return "project";
}
case physical::PhysicalOpr_Operator::OpKindCase::kSink: {
return "sink";
}
case physical::PhysicalOpr_Operator::OpKindCase::kDedup: {
return "dedup";
}
case physical::PhysicalOpr_Operator::OpKindCase::kGroupBy: {
return "group_by";
}
case physical::PhysicalOpr_Operator::OpKindCase::kSelect: {
return "select";
}
case physical::PhysicalOpr_Operator::OpKindCase::kPath: {
return "path";
}
case physical::PhysicalOpr_Operator::OpKindCase::kJoin: {
return "join";
}
case physical::PhysicalOpr_Operator::OpKindCase::kRoot: {
return "root";
}
case physical::PhysicalOpr_Operator::OpKindCase::kIntersect: {
return "intersect";
}
case physical::PhysicalOpr_Operator::OpKindCase::kUnion: {
return "union";
}
case physical::PhysicalOpr_Operator::OpKindCase::kUnfold: {
return "unfold";
}
default:
return "unknown";
}
}
#endif
bl::result<std::pair<ReadPipeline, ContextMeta>>
PlanParser::parse_read_pipeline_with_meta(const gs::Schema& schema,
const ContextMeta& ctx_meta,
const physical::PhysicalPlan& plan) {
int opr_num = plan.plan_size();
std::vector<std::unique_ptr<IReadOperator>> operators;
ContextMeta cur_ctx_meta = ctx_meta;
for (int i = 0; i < opr_num;) {
physical::PhysicalOpr_Operator::OpKindCase cur_op_kind =
plan.plan(i).opr().op_kind_case();
if (cur_op_kind == physical::PhysicalOpr_Operator::OpKindCase::kSink) {
// break;
}
if (cur_op_kind == physical::PhysicalOpr_Operator::OpKindCase::kRoot) {
++i;
continue;
}
auto& builders = read_op_builders_[cur_op_kind];
int old_i = i;
gs::Status status = gs::Status::OK();
for (auto& pair : builders) {
auto pattern = pair.first;
auto& builder = pair.second;
if (pattern.size() > static_cast<size_t>(opr_num - i)) {
continue;
}
bool match = true;
for (size_t j = 1; j < pattern.size(); ++j) {
if (plan.plan(i + j).opr().op_kind_case() != pattern[j]) {
match = false;
}
}
if (match) {
bl::result<ReadOpBuildResultT> res_pair_status = bl::try_handle_some(
[&builder, &schema, &cur_ctx_meta, &plan,
&i]() -> bl::result<ReadOpBuildResultT> {
return builder->Build(schema, cur_ctx_meta, plan, i);
},
[&status](const gs::Status& err) {
status = err;
return ReadOpBuildResultT(nullptr, ContextMeta());
},
[&](const bl::error_info& err) {
status =
gs::Status(gs::StatusCode::INTERNAL_ERROR,
"Error: " + std::to_string(err.error().value()) +
", Exception: " + err.exception()->what());
return ReadOpBuildResultT(nullptr, ContextMeta());
},
[&]() {
status = gs::Status(gs::StatusCode::UNKNOWN, "Unknown error");
return ReadOpBuildResultT(nullptr, ContextMeta());
});
if (res_pair_status) {
auto& opr = res_pair_status.value().first;
auto& new_ctx_meta = res_pair_status.value().second;
if (opr) {
operators.emplace_back(std::move(opr));
cur_ctx_meta = new_ctx_meta;
i = builder->stepping(i);
// Reset status to OK after a successful match.
status = gs::Status::OK();
break;
} else {
// If the operator is null, it means the builder has failed, we need
// to stage the error.
status = gs::Status(gs::StatusCode::INTERNAL_ERROR,
"Failed to build operator at index " +
std::to_string(i) +
", op_kind: " + get_opr_name(cur_op_kind));
}
}
}
}
if (i == old_i) {
std::stringstream ss;
ss << "[Parse Failed] " << get_opr_name(cur_op_kind)
<< " failed to parse plan at index " << i << " "
<< plan.plan(i).DebugString() << ": "
<< ", last match error: " << status.ToString();
auto err = gs::Status(gs::StatusCode::INTERNAL_ERROR, ss.str());
LOG(ERROR) << err.ToString();
return bl::new_error(err);
}
}
return std::make_pair(ReadPipeline(std::move(operators)), cur_ctx_meta);
}
bl::result<ReadPipeline> PlanParser::parse_read_pipeline(
const gs::Schema& schema, const ContextMeta& ctx_meta,
const physical::PhysicalPlan& plan) {
auto ret = parse_read_pipeline_with_meta(schema, ctx_meta, plan);
if (!ret) {
return ret.error();
}
return std::move(ret.value().first);
}
bl::result<InsertPipeline> PlanParser::parse_write_pipeline(
const gs::Schema& schema, const physical::PhysicalPlan& plan) {
std::vector<std::unique_ptr<IInsertOperator>> operators;
for (int i = 0; i < plan.plan_size(); ++i) {
auto op_kind = plan.plan(i).opr().op_kind_case();
if (write_op_builders_.find(op_kind) == write_op_builders_.end()) {
std::stringstream ss;
ss << "[Parse Failed] " << get_opr_name(op_kind)
<< " failed to parse plan at index " << i;
auto err = gs::Status(gs::StatusCode::INTERNAL_ERROR, ss.str());
// LOG(ERROR) << err.ToString();
return bl::new_error(err);
}
auto op = write_op_builders_.at(op_kind)->Build(schema, plan, i);
if (!op) {
std::stringstream ss;
ss << "[Parse Failed]" << get_opr_name(op_kind)
<< " failed to parse plan at index " << i;
auto err = gs::Status(gs::StatusCode::INTERNAL_ERROR, ss.str());
LOG(ERROR) << err.ToString();
return bl::new_error(err);
}
operators.emplace_back(std::move(op));
}
return InsertPipeline(std::move(operators));
}
bl::result<UpdatePipeline> PlanParser::parse_update_pipeline(
const gs::Schema& schema, const physical::PhysicalPlan& plan) {
auto res = parse_write_pipeline(schema, plan);
// insert pipeline
if (res) {
return UpdatePipeline(std::move(res.value()));
}
std::vector<std::unique_ptr<IUpdateOperator>> operators;
for (int i = 0; i < plan.plan_size(); ++i) {
auto op_kind = plan.plan(i).opr().op_kind_case();
if (update_op_builders_.find(op_kind) == update_op_builders_.end()) {
std::stringstream ss;
ss << "[Parse Failed] " << get_opr_name(op_kind)
<< " failed to parse plan at index " << i;
auto err = gs::Status(gs::StatusCode::INTERNAL_ERROR, ss.str());
LOG(ERROR) << err.ToString();
return bl::new_error(err);
}
auto op = update_op_builders_.at(op_kind)->Build(schema, plan, i);
if (!op) {
std::stringstream ss;
ss << "[Parse Failed]" << get_opr_name(op_kind)
<< " failed to parse plan at index " << i;
auto err = gs::Status(gs::StatusCode::INTERNAL_ERROR, ss.str());
LOG(ERROR) << err.ToString();
return bl::new_error(err);
}
operators.emplace_back(std::move(op));
}
return UpdatePipeline(std::move(operators));
}
} // namespace runtime
} // namespace gs