in flex/engines/graph_db/runtime/execute/ops/retrieve/group_by.cc [984:1145]
bl::result<ReadOpBuildResultT> GroupByOprBuilder::Build(
const gs::Schema& schema, const ContextMeta& ctx_meta,
const physical::PhysicalPlan& plan, int op_idx) {
int mappings_num = plan.plan(op_idx).opr().group_by().mappings_size();
int func_num = plan.plan(op_idx).opr().group_by().functions_size();
ContextMeta meta;
for (int i = 0; i < mappings_num; ++i) {
auto& key = plan.plan(op_idx).opr().group_by().mappings(i);
if (key.has_alias()) {
meta.set(key.alias().value());
} else {
meta.set(-1);
}
}
for (int i = 0; i < func_num; ++i) {
auto& func = plan.plan(op_idx).opr().group_by().functions(i);
if (func.has_alias()) {
meta.set(func.alias().value());
} else {
meta.set(-1);
}
}
auto opr = plan.plan(op_idx).opr().group_by();
std::vector<std::pair<int, int>> mappings;
std::vector<common::Variable> vars;
bool has_property = false;
for (int i = 0; i < mappings_num; ++i) {
auto& key = opr.mappings(i);
if (!key.has_key() || !key.has_alias()) {
LOG(ERROR) << "key should have key and alias";
return std::make_pair(nullptr, meta);
}
int tag = key.key().has_tag() ? key.key().tag().id() : -1;
int alias = key.has_alias() ? key.alias().value() : -1;
if (key.key().has_property()) {
mappings.emplace_back(alias, alias);
has_property = true;
} else {
mappings.emplace_back(tag, alias);
}
vars.emplace_back(key.key());
}
std::function<std::vector<std::unique_ptr<ProjectExprBase>>(
const GraphReadInterface&, const Context&)>
make_project_func = nullptr;
if (has_property) {
make_project_func = [vars, mappings](const GraphReadInterface& graph,
const Context& ctx)
-> std::vector<std::unique_ptr<ProjectExprBase>> {
std::vector<std::unique_ptr<ProjectExprBase>> exprs;
int idx = 0;
for (const auto& var : vars) {
auto alias = mappings[idx++].second;
if (!var.has_property()) {
continue;
}
Var var_(graph, ctx, var, VarType::kPathVar);
if (var_.type() == RTAnyType::kStringValue) {
TypedKeyCollector<std::string_view>::TypedKeyWrapper wrapper(
std::move(var_));
TypedKeyCollector<std::string_view> collector;
exprs.emplace_back(
std::make_unique<
ProjectExpr<decltype(wrapper), decltype(collector)>>(
std::move(wrapper), std::move(collector), alias));
} else if (var_.type() == RTAnyType::kI64Value) {
TypedKeyCollector<int64_t>::TypedKeyWrapper wrapper(std::move(var_));
TypedKeyCollector<int64_t> collector;
exprs.emplace_back(
std::make_unique<
ProjectExpr<decltype(wrapper), decltype(collector)>>(
std::move(wrapper), std::move(collector), alias));
} else if (var_.type() == RTAnyType::kI32Value) {
TypedKeyCollector<int32_t>::TypedKeyWrapper wrapper(std::move(var_));
TypedKeyCollector<int32_t> collector;
exprs.emplace_back(
std::make_unique<
ProjectExpr<decltype(wrapper), decltype(collector)>>(
std::move(wrapper), std::move(collector), alias));
} else {
LOG(FATAL) << "unsupport" << static_cast<int>(var_.type());
}
}
return exprs;
};
}
auto make_key_func = [mappings = std::move(mappings), vars = std::move(vars)](
const GraphReadInterface& graph,
const Context& ctx) -> std::unique_ptr<KeyBase> {
std::unique_ptr<KeyBase> key = nullptr;
if (mappings.size() == 1) {
key = KeyBuilder<1>::make_sp_key(ctx, mappings);
}
if (key == nullptr) {
std::vector<VarWrapper> key_vars;
for (const auto& var : vars) {
Var var_(graph, ctx, var, VarType::kPathVar);
key_vars.emplace_back(VarWrapper(std::move(var_)));
}
key = std::make_unique<GKey<VarWrapper>>(std::move(key_vars), mappings);
}
return key;
};
std::vector<std::function<std::unique_ptr<ReducerBase>(
const GraphReadInterface&, const Context&)>>
reduces;
std::vector<std::pair<int, int>> dependencies;
for (int i = 0; i < func_num; ++i) {
auto& func = opr.functions(i);
auto aggr_kind = parse_aggregate(func.aggregate());
int alias = func.has_alias() ? func.alias().value() : -1;
if (func.vars_size() == 2) {
auto& fst = func.vars(0);
auto& snd = func.vars(1);
reduces.emplace_back(
[aggr_kind, alias, fst, snd](
const GraphReadInterface& graph,
const Context& ctx) -> std::unique_ptr<ReducerBase> {
Var fst_var(graph, ctx, fst, VarType::kPathVar);
Var snd_var(graph, ctx, snd, VarType::kPathVar);
return make_pair_reducer(graph, ctx, std::move(fst_var),
std::move(snd_var), aggr_kind, alias);
});
continue;
}
auto& var = func.vars(0);
if (aggr_kind == AggrKind::kToList || aggr_kind == AggrKind::kToSet ||
aggr_kind == AggrKind::kFirst || aggr_kind == AggrKind::kMin ||
aggr_kind == AggrKind::kMax) {
if (!var.has_property()) {
int tag = var.has_tag() ? var.tag().id() : -1;
dependencies.emplace_back(alias, tag);
}
}
reduces.emplace_back(
[alias, aggr_kind, var](
const GraphReadInterface& graph,
const Context& ctx) -> std::unique_ptr<ReducerBase> {
return make_reducer(graph, ctx, var, aggr_kind, alias);
});
}
if (!has_property) {
return std::make_pair(
std::make_unique<GroupByOpr>(std::move(make_key_func),
std::move(reduces), dependencies),
meta);
} else {
return std::make_pair(
std::make_unique<GroupByOprBeta>(std::move(make_project_func),
std::move(make_key_func),
std::move(reduces), dependencies),
meta);
}
}