bl::result GroupByOprBuilder::Build()

in flex/engines/graph_db/runtime/execute/ops/retrieve/group_by.cc [984:1145]


bl::result<ReadOpBuildResultT> GroupByOprBuilder::Build(
    const gs::Schema& schema, const ContextMeta& ctx_meta,
    const physical::PhysicalPlan& plan, int op_idx) {
  int mappings_num = plan.plan(op_idx).opr().group_by().mappings_size();
  int func_num = plan.plan(op_idx).opr().group_by().functions_size();
  ContextMeta meta;
  for (int i = 0; i < mappings_num; ++i) {
    auto& key = plan.plan(op_idx).opr().group_by().mappings(i);
    if (key.has_alias()) {
      meta.set(key.alias().value());
    } else {
      meta.set(-1);
    }
  }
  for (int i = 0; i < func_num; ++i) {
    auto& func = plan.plan(op_idx).opr().group_by().functions(i);
    if (func.has_alias()) {
      meta.set(func.alias().value());
    } else {
      meta.set(-1);
    }
  }

  auto opr = plan.plan(op_idx).opr().group_by();
  std::vector<std::pair<int, int>> mappings;
  std::vector<common::Variable> vars;
  bool has_property = false;

  for (int i = 0; i < mappings_num; ++i) {
    auto& key = opr.mappings(i);
    if (!key.has_key() || !key.has_alias()) {
      LOG(ERROR) << "key should have key and alias";
      return std::make_pair(nullptr, meta);
    }
    int tag = key.key().has_tag() ? key.key().tag().id() : -1;
    int alias = key.has_alias() ? key.alias().value() : -1;
    if (key.key().has_property()) {
      mappings.emplace_back(alias, alias);
      has_property = true;
    } else {
      mappings.emplace_back(tag, alias);
    }
    vars.emplace_back(key.key());
  }
  std::function<std::vector<std::unique_ptr<ProjectExprBase>>(
      const GraphReadInterface&, const Context&)>
      make_project_func = nullptr;
  if (has_property) {
    make_project_func = [vars, mappings](const GraphReadInterface& graph,
                                         const Context& ctx)
        -> std::vector<std::unique_ptr<ProjectExprBase>> {
      std::vector<std::unique_ptr<ProjectExprBase>> exprs;
      int idx = 0;
      for (const auto& var : vars) {
        auto alias = mappings[idx++].second;
        if (!var.has_property()) {
          continue;
        }

        Var var_(graph, ctx, var, VarType::kPathVar);
        if (var_.type() == RTAnyType::kStringValue) {
          TypedKeyCollector<std::string_view>::TypedKeyWrapper wrapper(
              std::move(var_));
          TypedKeyCollector<std::string_view> collector;
          exprs.emplace_back(
              std::make_unique<
                  ProjectExpr<decltype(wrapper), decltype(collector)>>(
                  std::move(wrapper), std::move(collector), alias));
        } else if (var_.type() == RTAnyType::kI64Value) {
          TypedKeyCollector<int64_t>::TypedKeyWrapper wrapper(std::move(var_));
          TypedKeyCollector<int64_t> collector;
          exprs.emplace_back(
              std::make_unique<
                  ProjectExpr<decltype(wrapper), decltype(collector)>>(
                  std::move(wrapper), std::move(collector), alias));
        } else if (var_.type() == RTAnyType::kI32Value) {
          TypedKeyCollector<int32_t>::TypedKeyWrapper wrapper(std::move(var_));
          TypedKeyCollector<int32_t> collector;
          exprs.emplace_back(
              std::make_unique<
                  ProjectExpr<decltype(wrapper), decltype(collector)>>(
                  std::move(wrapper), std::move(collector), alias));
        } else {
          LOG(FATAL) << "unsupport" << static_cast<int>(var_.type());
        }
      }
      return exprs;
    };
  }
  auto make_key_func = [mappings = std::move(mappings), vars = std::move(vars)](
                           const GraphReadInterface& graph,
                           const Context& ctx) -> std::unique_ptr<KeyBase> {
    std::unique_ptr<KeyBase> key = nullptr;
    if (mappings.size() == 1) {
      key = KeyBuilder<1>::make_sp_key(ctx, mappings);
    }
    if (key == nullptr) {
      std::vector<VarWrapper> key_vars;

      for (const auto& var : vars) {
        Var var_(graph, ctx, var, VarType::kPathVar);
        key_vars.emplace_back(VarWrapper(std::move(var_)));
      }
      key = std::make_unique<GKey<VarWrapper>>(std::move(key_vars), mappings);
    }
    return key;
  };

  std::vector<std::function<std::unique_ptr<ReducerBase>(
      const GraphReadInterface&, const Context&)>>
      reduces;
  std::vector<std::pair<int, int>> dependencies;
  for (int i = 0; i < func_num; ++i) {
    auto& func = opr.functions(i);
    auto aggr_kind = parse_aggregate(func.aggregate());
    int alias = func.has_alias() ? func.alias().value() : -1;
    if (func.vars_size() == 2) {
      auto& fst = func.vars(0);
      auto& snd = func.vars(1);
      reduces.emplace_back(
          [aggr_kind, alias, fst, snd](
              const GraphReadInterface& graph,
              const Context& ctx) -> std::unique_ptr<ReducerBase> {
            Var fst_var(graph, ctx, fst, VarType::kPathVar);
            Var snd_var(graph, ctx, snd, VarType::kPathVar);
            return make_pair_reducer(graph, ctx, std::move(fst_var),
                                     std::move(snd_var), aggr_kind, alias);
          });
      continue;
    }
    auto& var = func.vars(0);
    if (aggr_kind == AggrKind::kToList || aggr_kind == AggrKind::kToSet ||
        aggr_kind == AggrKind::kFirst || aggr_kind == AggrKind::kMin ||
        aggr_kind == AggrKind::kMax) {
      if (!var.has_property()) {
        int tag = var.has_tag() ? var.tag().id() : -1;
        dependencies.emplace_back(alias, tag);
      }
    }

    reduces.emplace_back(
        [alias, aggr_kind, var](
            const GraphReadInterface& graph,
            const Context& ctx) -> std::unique_ptr<ReducerBase> {
          return make_reducer(graph, ctx, var, aggr_kind, alias);
        });
  }
  if (!has_property) {
    return std::make_pair(
        std::make_unique<GroupByOpr>(std::move(make_key_func),
                                     std::move(reduces), dependencies),

        meta);
  } else {
    return std::make_pair(
        std::make_unique<GroupByOprBeta>(std::move(make_project_func),
                                         std::move(make_key_func),
                                         std::move(reduces), dependencies),

        meta);
  }
}