in flex/codegen/src/hqps_generator.h [368:607]
std::string build_query_code() const {
std::stringstream ss;
auto size = plan_.plan_size();
LOG(INFO) << "Found " << size << " operators in the plan";
for (int32_t i = 0; i < size; ++i) {
auto op = plan_.plan(i);
auto& meta_datas = op.meta_data();
// CHECK(meta_datas.size() == 1) << "meta data size: " <<
// meta_datas.size();
// physical::PhysicalOpr::MetaData meta_data; //fake meta
auto opr = op.opr();
switch (opr.op_kind_case()) {
case physical::PhysicalOpr::Operator::kRoot: {
LOG(INFO) << "Skip root_scan";
break;
}
case physical::PhysicalOpr::Operator::kScan: { // scan
// TODO: meta_data is not found in scan
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a scan operator";
auto& scan_op = opr.scan();
ss << BuildScanOp(ctx_, scan_op, meta_data, schema_) << std::endl;
break;
}
case physical::PhysicalOpr::Operator::kEdge: { // edge expand
physical::EdgeExpand real_edge_expand = opr.edge();
// try to use information from later operator
std::vector<LabelT> dst_vertex_labels;
if (i + 1 < size) {
auto& get_v_op_opr = plan_.plan(i + 1).opr();
if (get_v_op_opr.op_kind_case() ==
physical::PhysicalOpr::Operator::kVertex) {
auto& get_v_op = get_v_op_opr.vertex();
extract_vertex_labels(get_v_op, dst_vertex_labels);
if (FUSE_EDGE_GET_V) {
if (simple_get_v(get_v_op, real_edge_expand) &&
intermediate_edge_op(real_edge_expand)) {
CHECK(dst_vertex_labels.size() > 0);
VLOG(10) << "When fusing edge+get_v, get_v has labels: "
<< gs::to_string(dst_vertex_labels);
build_fused_edge_get_v<LabelT>(ctx_, ss, real_edge_expand,
meta_datas[0], get_v_op,
dst_vertex_labels);
LOG(INFO) << "Fuse edge expand and get_v since get_v is simple";
i += 1;
break;
} else if (intermediate_edge_op(real_edge_expand)) {
LOG(INFO) << "try to fuse edge expand with complex get_v, take "
"take the get_v' vertex label";
} else {
// only fuse get_v label into edge expand
LOG(INFO)
<< "Skip fusing edge expand and get_v since simple get v";
}
}
} else {
LOG(INFO) << "Skip fusing edge expand and get_v since the next "
"operator is not get_v";
}
} else {
LOG(INFO) << "EdgeExpand is the last operator";
}
auto& meta_data = meta_datas[0];
LOG(INFO) << "Found a edge expand operator";
ss << BuildEdgeExpandOp<LabelT>(ctx_, real_edge_expand, meta_data,
dst_vertex_labels)
<< std::endl;
break;
}
case physical::PhysicalOpr::Operator::kDedup: { // dedup
// auto& meta_data = meta_datas[0];
physical::PhysicalOpr::MetaData meta_data; // fake meta
LOG(INFO) << "Found a dedup operator";
auto& dedup_op = opr.dedup();
ss << BuildDedupOp(ctx_, dedup_op, meta_data) << std::endl;
break;
}
case physical::PhysicalOpr::Operator::kProject: { // project
// project op can result into multiple meta data
// auto& meta_data = meta_datas[0];
LOG(INFO) << "Found a project operator";
auto& project_op = opr.project();
physical::PhysicalOpr_MetaData meta_data;
if (meta_datas.size() > 0) {
meta_data = meta_datas[0];
}
std::string call_project_code;
call_project_code = BuildProjectOp(ctx_, project_op, meta_data);
ss << call_project_code;
break;
}
case physical::PhysicalOpr::Operator::kSelect: {
// auto& meta_data = meta_datas[0];
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a select operator";
auto& select_op = opr.select();
std::string select_code = BuildSelectOp(ctx_, select_op, meta_data);
ss << select_code << std::endl;
break;
}
case physical::PhysicalOpr::Operator::kVertex: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a get v operator";
auto& get_v_op = opr.vertex();
auto get_v_code = BuildGetVOp<LabelT>(ctx_, get_v_op, meta_data);
// first output code can be empty, just ignore
ss << get_v_code;
break;
}
case physical::PhysicalOpr::Operator::kGroupBy: {
// auto& meta_data = meta_datas[0];
// meta_data is currently not used in groupby.
physical::PhysicalOpr::MetaData meta_data;
auto& group_by_op = opr.group_by();
if (group_by_op.mappings_size() > 0) {
LOG(INFO) << "Found a group by operator";
auto code_lines = BuildGroupByOp(ctx_, group_by_op, meta_data);
ss << code_lines;
} else {
LOG(INFO) << "Found a group by operator with no group by keys";
auto code_lines =
BuildGroupWithoutKeyOp(ctx_, group_by_op, meta_data);
ss << code_lines;
}
LOG(INFO) << "Finish groupby operator gen";
break;
}
// Path Expand + GetV shall be always fused.
case physical::PhysicalOpr::Operator::kPath: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a path operator";
auto& path_op = opr.path();
if (FUSE_PATH_EXPAND_V && !path_op.has_alias() && (i + 1 < size)) {
auto& next_op = plan_.plan(i + 1).opr();
if (next_op.op_kind_case() ==
physical::PhysicalOpr::Operator::kVertex) {
LOG(INFO) << " Fusing path expand and get_v";
auto& get_v_op = next_op.vertex();
int32_t get_v_res_alias = -1;
if (get_v_op.has_alias()) {
get_v_res_alias = get_v_op.alias().value();
}
auto res = BuildPathExpandVOp<LabelT>(ctx_, path_op, meta_datas,
get_v_res_alias);
ss << res;
i += 1; // jump one step
break;
}
}
LOG(INFO) << " PathExpand to Path";
// otherwise, just expand path
auto res = BuildPathExpandPathOp<LabelT>(ctx_, path_op, meta_datas);
ss << res;
break;
}
case physical::PhysicalOpr::Operator::kApply: {
auto& meta_data = meta_datas[0];
LOG(INFO) << "Found a apply operator";
auto& apply_op = opr.apply();
std::string call_apply_code =
BuildApplyOp<LabelT>(ctx_, apply_op, meta_data);
ss << call_apply_code << std::endl;
break;
}
case physical::PhysicalOpr::Operator::kJoin: {
// auto& meta_data = meta_datas[0];
LOG(INFO) << "Found a join operator";
auto& join_op = opr.join();
auto join_opt_code = BuildJoinOp<LabelT>(ctx_, join_op);
for (auto& line : join_opt_code) {
ss << line << std::endl;
}
break;
}
case physical::PhysicalOpr::Operator::kIntersect: {
LOG(INFO) << "Found a intersect operator";
// Note that intersect operator will not be followed by unfold anymore.
auto& intersect_op = opr.intersect();
auto intersect_opt_code = BuildIntersectOp<LabelT>(ctx_, intersect_op);
for (auto& line : intersect_opt_code) {
ss << line << std::endl;
}
break;
}
case physical::PhysicalOpr::Operator::kOrderBy: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a order by operator";
auto& order_by_op = opr.order_by();
std::string sort_code = BuildSortOp(ctx_, order_by_op, meta_data);
ss << sort_code << std::endl;
break;
}
case physical::PhysicalOpr::Operator::kSink: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a sink operator";
auto& sink_op = opr.sink();
std::string call_sink_code = BuildSinkOp(ctx_, sink_op, meta_data);
ss << call_sink_code << std::endl;
break;
}
case physical::PhysicalOpr::Operator::kRepartition: {
LOG(INFO) << "Found a repartition operator, just ignore";
break;
}
case physical::PhysicalOpr::Operator::kLimit: {
LOG(INFO) << "Found a limit operator";
auto& limit_op = opr.limit();
std::string limit_code = BuildLimitOp(ctx_, limit_op);
ss << limit_code << std::endl;
break;
}
default:
LOG(FATAL) << "Unsupported operator type: " << opr.op_kind_case();
}
}
LOG(INFO) << "Finish adding query";
return ss.str();
}