in flex/codegen/src/pegasus_generator.h [151:346]
void addQueryBody(std::stringstream& ss) const {
auto size = plan_.plan_size();
LOG(INFO) << "Found " << size << " operators in the plan";
ss << "Box::new(move |input: &mut Source<i32>, output: ResultSink<String>| "
"{\n";
ss << "let worker_id = input.get_worker_index() % workers;\n";
ss << "let stream_0 = input.input_from(vec![0])?;\n";
std::string plan_json;
google::protobuf::util::JsonPrintOptions option;
option.always_print_primitive_fields = true;
auto st =
google::protobuf::util::MessageToJsonString(plan_, &plan_json, option);
for (auto i = 0; i < size; ++i) {
auto op = plan_.plan(i);
LOG(INFO) << "Start codegen for operator " << i;
auto& meta_datas = op.meta_data();
// CHECK(meta_datas.size() == 1) << "meta data size: " <<
// meta_datas.size();
// physical::PhysicalOpr::MetaData meta_data; //fake meta
auto opr = op.opr();
LOG(INFO) << "Input size of current operator is " << ctx_.InputSize();
switch (opr.op_kind_case()) {
case physical::PhysicalOpr::Operator::kScan: { // scan
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a scan operator";
auto& scan_op = opr.scan();
auto scan_codegen =
pegasus::BuildScanOp(ctx_, i + 1, scan_op, meta_data);
LOG(INFO) << scan_codegen;
ss << scan_codegen;
break;
}
case physical::PhysicalOpr::Operator::kRepartition: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a repartition operator";
auto& repartition_op = opr.repartition();
auto repartition_codegen =
pegasus::BuildRepartitionOp(ctx_, i + 1, repartition_op, meta_data);
LOG(INFO) << repartition_codegen;
ss << repartition_codegen;
break;
}
case physical::PhysicalOpr::Operator::kGroupBy: {
std::vector<physical::PhysicalOpr::MetaData> meta_datas;
for (auto i = 0; i < op.meta_data_size(); i++) {
meta_datas.push_back(op.meta_data(i));
}
LOG(INFO) << "Found a groupby operator";
auto& groupby_op = opr.group_by();
ss << pegasus::BuildGroupByOp(ctx_, i + 1, groupby_op, meta_datas);
break;
}
case physical::PhysicalOpr::Operator::kOrderBy: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a order_by operator";
auto& orderby_op = opr.order_by();
ss << pegasus::BuildOrderByOp(ctx_, i + 1, orderby_op, meta_data);
break;
}
case physical::PhysicalOpr::Operator::kProject: {
std::vector<physical::PhysicalOpr::MetaData> meta_data;
for (auto i = 0; i < op.meta_data_size(); i++) {
meta_data.push_back(op.meta_data(i));
}
LOG(INFO) << "Found a project operator";
auto& project_op = opr.project();
ss << pegasus::BuildProjectOp(ctx_, i + 1, project_op, meta_data);
break;
}
case physical::PhysicalOpr::Operator::kEdge: { // edge expand
auto& meta_data = meta_datas[0];
LOG(INFO) << "Found a edge expand operator";
auto& edge_op = opr.edge();
auto edge_codegen = pegasus::BuildEdgeExpandOp<int32_t>(
ctx_, i + 1, edge_op, meta_data);
LOG(INFO) << edge_codegen;
ss << edge_codegen;
break;
}
case physical::PhysicalOpr::Operator::kVertex: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a get_v operator";
auto& vertex_op = opr.vertex();
auto vertex_codegen =
pegasus::BuildGetVOp<uint8_t>(ctx_, i + 1, vertex_op, meta_data);
LOG(INFO) << vertex_codegen;
ss << vertex_codegen;
break;
}
case physical::PhysicalOpr::Operator::kSink: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a sink operator";
auto& sink_op = opr.sink();
std::string call_sink_code =
pegasus::BuildSinkOp(ctx_, i + 1, sink_op, meta_data);
ss << call_sink_code;
break;
}
case physical::PhysicalOpr::Operator::kPath: {
auto& meta_data = meta_datas[0];
LOG(INFO) << "Found a path expand operator";
auto& path_op = opr.path();
auto path_expand_codegen =
pegasus::BuildPathExpandOp<int32_t>(ctx_, path_op, meta_data);
LOG(INFO) << path_expand_codegen;
ss << path_expand_codegen;
break;
}
case physical::PhysicalOpr::Operator::kIntersect: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a intersect operator";
auto& intersect_op = opr.intersect();
auto intersect_codegen =
pegasus::BuildIntersectOp(ctx_, intersect_op, meta_data);
LOG(INFO) << intersect_codegen;
ss << intersect_codegen;
break;
}
case physical::PhysicalOpr::Operator::kUnfold: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a unfold operator";
auto& unfold_op = opr.unfold();
auto unfold_codegen =
pegasus::BuildUnfoldOp(ctx_, i + 1, unfold_op, meta_data);
LOG(INFO) << unfold_codegen;
ss << unfold_codegen;
break;
}
case physical::PhysicalOpr::Operator::kDedup: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a dedup operator";
auto& dedup_op = opr.dedup();
auto dedup_codegen =
pegasus::BuildDedupOp(ctx_, i + 1, dedup_op, meta_data);
LOG(INFO) << dedup_codegen;
ss << dedup_codegen;
break;
}
case physical::PhysicalOpr::Operator::kUnion: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a union operator";
auto& union_op = opr.union_();
auto union_codegen =
pegasus::BuildUnionOp(ctx_, i + 1, union_op, meta_data);
LOG(INFO) << union_codegen;
ss << union_codegen;
break;
}
case physical::PhysicalOpr::Operator::kJoin: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a join operator";
auto& join_op = opr.join();
auto join_codegen =
pegasus::BuildJoinOp(ctx_, i + 1, join_op, meta_data);
LOG(INFO) << join_codegen;
ss << join_codegen;
break;
}
case physical::PhysicalOpr::Operator::kSelect: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a select operator";
auto& select_op = opr.select();
auto select_codegen =
pegasus::BuildSelectOp(ctx_, i + 1, select_op, meta_data);
LOG(INFO) << select_codegen;
ss << select_codegen;
break;
}
case physical::PhysicalOpr::Operator::kLimit: {
physical::PhysicalOpr::MetaData meta_data;
LOG(INFO) << "Found a select operator";
auto& limit_pb = opr.limit();
auto limit_codegen =
pegasus::BuildLimitOp(ctx_, i + 1, limit_pb, meta_data);
LOG(INFO) << limit_codegen;
ss << limit_codegen;
break;
}
default:
LOG(FATAL) << "Unsupported operator type: " << opr.op_kind_case();
}
}
LOG(INFO) << "Finish adding query";
}