void addQueryBody()

in flex/codegen/src/pegasus_generator.h [151:346]


  void addQueryBody(std::stringstream& ss) const {
    auto size = plan_.plan_size();

    LOG(INFO) << "Found " << size << " operators in the plan";
    ss << "Box::new(move |input: &mut Source<i32>, output: ResultSink<String>| "
          "{\n";
    ss << "let worker_id = input.get_worker_index() % workers;\n";
    ss << "let stream_0 = input.input_from(vec![0])?;\n";

    std::string plan_json;
    google::protobuf::util::JsonPrintOptions option;
    option.always_print_primitive_fields = true;
    auto st =
        google::protobuf::util::MessageToJsonString(plan_, &plan_json, option);
    for (auto i = 0; i < size; ++i) {
      auto op = plan_.plan(i);
      LOG(INFO) << "Start codegen for operator " << i;
      auto& meta_datas = op.meta_data();
      // CHECK(meta_datas.size() == 1) << "meta data size: " <<
      // meta_datas.size();
      // physical::PhysicalOpr::MetaData meta_data; //fake meta
      auto opr = op.opr();
      LOG(INFO) << "Input size of current operator is " << ctx_.InputSize();
      switch (opr.op_kind_case()) {
      case physical::PhysicalOpr::Operator::kScan: {  // scan
        physical::PhysicalOpr::MetaData meta_data;

        LOG(INFO) << "Found a scan operator";
        auto& scan_op = opr.scan();
        auto scan_codegen =
            pegasus::BuildScanOp(ctx_, i + 1, scan_op, meta_data);
        LOG(INFO) << scan_codegen;
        ss << scan_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kRepartition: {
        physical::PhysicalOpr::MetaData meta_data;

        LOG(INFO) << "Found a repartition operator";
        auto& repartition_op = opr.repartition();
        auto repartition_codegen =
            pegasus::BuildRepartitionOp(ctx_, i + 1, repartition_op, meta_data);
        LOG(INFO) << repartition_codegen;
        ss << repartition_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kGroupBy: {
        std::vector<physical::PhysicalOpr::MetaData> meta_datas;
        for (auto i = 0; i < op.meta_data_size(); i++) {
          meta_datas.push_back(op.meta_data(i));
        }

        LOG(INFO) << "Found a groupby operator";
        auto& groupby_op = opr.group_by();

        ss << pegasus::BuildGroupByOp(ctx_, i + 1, groupby_op, meta_datas);
        break;
      }
      case physical::PhysicalOpr::Operator::kOrderBy: {
        physical::PhysicalOpr::MetaData meta_data;

        LOG(INFO) << "Found a order_by operator";
        auto& orderby_op = opr.order_by();

        ss << pegasus::BuildOrderByOp(ctx_, i + 1, orderby_op, meta_data);
        break;
      }
      case physical::PhysicalOpr::Operator::kProject: {
        std::vector<physical::PhysicalOpr::MetaData> meta_data;
        for (auto i = 0; i < op.meta_data_size(); i++) {
          meta_data.push_back(op.meta_data(i));
        }

        LOG(INFO) << "Found a project operator";
        auto& project_op = opr.project();

        ss << pegasus::BuildProjectOp(ctx_, i + 1, project_op, meta_data);
        break;
      }
      case physical::PhysicalOpr::Operator::kEdge: {  // edge expand
        auto& meta_data = meta_datas[0];
        LOG(INFO) << "Found a edge expand operator";
        auto& edge_op = opr.edge();
        auto edge_codegen = pegasus::BuildEdgeExpandOp<int32_t>(
            ctx_, i + 1, edge_op, meta_data);
        LOG(INFO) << edge_codegen;
        ss << edge_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kVertex: {
        physical::PhysicalOpr::MetaData meta_data;

        LOG(INFO) << "Found a get_v operator";
        auto& vertex_op = opr.vertex();
        auto vertex_codegen =
            pegasus::BuildGetVOp<uint8_t>(ctx_, i + 1, vertex_op, meta_data);
        LOG(INFO) << vertex_codegen;
        ss << vertex_codegen;

        break;
      }
      case physical::PhysicalOpr::Operator::kSink: {
        physical::PhysicalOpr::MetaData meta_data;
        LOG(INFO) << "Found a sink operator";
        auto& sink_op = opr.sink();
        std::string call_sink_code =
            pegasus::BuildSinkOp(ctx_, i + 1, sink_op, meta_data);
        ss << call_sink_code;
        break;
      }
      case physical::PhysicalOpr::Operator::kPath: {
        auto& meta_data = meta_datas[0];
        LOG(INFO) << "Found a path expand operator";
        auto& path_op = opr.path();
        auto path_expand_codegen =
            pegasus::BuildPathExpandOp<int32_t>(ctx_, path_op, meta_data);
        LOG(INFO) << path_expand_codegen;
        ss << path_expand_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kIntersect: {
        physical::PhysicalOpr::MetaData meta_data;
        LOG(INFO) << "Found a intersect operator";
        auto& intersect_op = opr.intersect();
        auto intersect_codegen =
            pegasus::BuildIntersectOp(ctx_, intersect_op, meta_data);
        LOG(INFO) << intersect_codegen;
        ss << intersect_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kUnfold: {
        physical::PhysicalOpr::MetaData meta_data;
        LOG(INFO) << "Found a unfold operator";
        auto& unfold_op = opr.unfold();
        auto unfold_codegen =
            pegasus::BuildUnfoldOp(ctx_, i + 1, unfold_op, meta_data);
        LOG(INFO) << unfold_codegen;
        ss << unfold_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kDedup: {
        physical::PhysicalOpr::MetaData meta_data;
        LOG(INFO) << "Found a dedup operator";
        auto& dedup_op = opr.dedup();
        auto dedup_codegen =
            pegasus::BuildDedupOp(ctx_, i + 1, dedup_op, meta_data);
        LOG(INFO) << dedup_codegen;
        ss << dedup_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kUnion: {
        physical::PhysicalOpr::MetaData meta_data;
        LOG(INFO) << "Found a union operator";
        auto& union_op = opr.union_();
        auto union_codegen =
            pegasus::BuildUnionOp(ctx_, i + 1, union_op, meta_data);
        LOG(INFO) << union_codegen;
        ss << union_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kJoin: {
        physical::PhysicalOpr::MetaData meta_data;
        LOG(INFO) << "Found a join operator";
        auto& join_op = opr.join();
        auto join_codegen =
            pegasus::BuildJoinOp(ctx_, i + 1, join_op, meta_data);
        LOG(INFO) << join_codegen;
        ss << join_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kSelect: {
        physical::PhysicalOpr::MetaData meta_data;
        LOG(INFO) << "Found a select operator";
        auto& select_op = opr.select();
        auto select_codegen =
            pegasus::BuildSelectOp(ctx_, i + 1, select_op, meta_data);
        LOG(INFO) << select_codegen;
        ss << select_codegen;
        break;
      }
      case physical::PhysicalOpr::Operator::kLimit: {
        physical::PhysicalOpr::MetaData meta_data;
        LOG(INFO) << "Found a select operator";
        auto& limit_pb = opr.limit();
        auto limit_codegen =
            pegasus::BuildLimitOp(ctx_, i + 1, limit_pb, meta_data);
        LOG(INFO) << limit_codegen;
        ss << limit_codegen;
        break;
      }
      default:
        LOG(FATAL) << "Unsupported operator type: " << opr.op_kind_case();
      }
    }
    LOG(INFO) << "Finish adding query";
  }