in flex/engines/graph_db/runtime/common/operators/retrieve/path_expand.cc [231:404]
bl::result<Context> PathExpand::edge_expand_p(const GraphReadInterface& graph,
Context&& ctx,
const PathExpandParams& params) {
std::vector<size_t> shuffle_offset;
auto& input_vertex_list =
*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.start_tag));
auto label_sets = input_vertex_list.get_labels_set();
auto labels = params.labels;
std::vector<std::vector<LabelTriplet>> out_labels_map(
graph.schema().vertex_label_num()),
in_labels_map(graph.schema().vertex_label_num());
for (const auto& triplet : labels) {
out_labels_map[triplet.src_label].emplace_back(triplet);
in_labels_map[triplet.dst_label].emplace_back(triplet);
}
auto dir = params.dir;
std::vector<std::pair<std::unique_ptr<PathImpl>, size_t>> input;
std::vector<std::pair<std::unique_ptr<PathImpl>, size_t>> output;
GeneralPathColumnBuilder builder;
std::shared_ptr<Arena> arena = std::make_shared<Arena>();
if (dir == Direction::kOut) {
foreach_vertex(input_vertex_list,
[&](size_t index, label_t label, vid_t v) {
auto p = PathImpl::make_path_impl(label, v);
input.emplace_back(std::move(p), index);
});
int depth = 0;
while (depth < params.hop_upper) {
output.clear();
if (depth + 1 < params.hop_upper) {
for (auto& [path, index] : input) {
auto end = path->get_end();
for (const auto& label_triplet : out_labels_map[end.label_]) {
auto oe_iter = graph.GetOutEdgeIterator(end.label_, end.vid_,
label_triplet.dst_label,
label_triplet.edge_label);
while (oe_iter.IsValid()) {
std::unique_ptr<PathImpl> new_path =
path->expand(label_triplet.edge_label,
label_triplet.dst_label, oe_iter.GetNeighbor());
output.emplace_back(std::move(new_path), index);
oe_iter.Next();
}
}
}
}
if (depth >= params.hop_lower) {
for (auto& [path, index] : input) {
builder.push_back_opt(Path(path.get()));
arena->emplace_back(std::move(path));
shuffle_offset.push_back(index);
}
}
if (depth + 1 >= params.hop_upper) {
break;
}
input.clear();
std::swap(input, output);
++depth;
}
ctx.set_with_reshuffle(params.alias, builder.finish(arena), shuffle_offset);
return ctx;
} else if (dir == Direction::kIn) {
foreach_vertex(input_vertex_list,
[&](size_t index, label_t label, vid_t v) {
auto p = PathImpl::make_path_impl(label, v);
input.emplace_back(std::move(p), index);
});
int depth = 0;
while (depth < params.hop_upper) {
output.clear();
if (depth + 1 < params.hop_upper) {
for (const auto& [path, index] : input) {
auto end = path->get_end();
for (const auto& label_triplet : in_labels_map[end.label_]) {
auto ie_iter = graph.GetInEdgeIterator(end.label_, end.vid_,
label_triplet.src_label,
label_triplet.edge_label);
while (ie_iter.IsValid()) {
std::unique_ptr<PathImpl> new_path =
path->expand(label_triplet.edge_label,
label_triplet.src_label, ie_iter.GetNeighbor());
output.emplace_back(std::move(new_path), index);
ie_iter.Next();
}
}
}
}
if (depth >= params.hop_lower) {
for (auto& [path, index] : input) {
builder.push_back_opt(Path(path.get()));
arena->emplace_back(std::move(path));
shuffle_offset.push_back(index);
}
}
if (depth + 1 >= params.hop_upper) {
break;
}
input.clear();
std::swap(input, output);
++depth;
}
ctx.set_with_reshuffle(params.alias, builder.finish(arena), shuffle_offset);
return ctx;
} else if (dir == Direction::kBoth) {
foreach_vertex(input_vertex_list,
[&](size_t index, label_t label, vid_t v) {
auto p = PathImpl::make_path_impl(label, v);
input.emplace_back(std::move(p), index);
});
int depth = 0;
while (depth < params.hop_upper) {
output.clear();
if (depth + 1 < params.hop_upper) {
for (auto& [path, index] : input) {
auto end = path->get_end();
for (const auto& label_triplet : out_labels_map[end.label_]) {
auto oe_iter = graph.GetOutEdgeIterator(end.label_, end.vid_,
label_triplet.dst_label,
label_triplet.edge_label);
while (oe_iter.IsValid()) {
auto new_path =
path->expand(label_triplet.edge_label,
label_triplet.dst_label, oe_iter.GetNeighbor());
output.emplace_back(std::move(new_path), index);
oe_iter.Next();
}
}
for (const auto& label_triplet : in_labels_map[end.label_]) {
auto ie_iter = graph.GetInEdgeIterator(end.label_, end.vid_,
label_triplet.src_label,
label_triplet.edge_label);
while (ie_iter.IsValid()) {
auto new_path =
path->expand(label_triplet.edge_label,
label_triplet.src_label, ie_iter.GetNeighbor());
output.emplace_back(std::move(new_path), index);
ie_iter.Next();
}
}
}
}
if (depth >= params.hop_lower) {
for (auto& [path, index] : input) {
builder.push_back_opt(Path(path.get()));
arena->emplace_back(std::move(path));
shuffle_offset.push_back(index);
}
}
if (depth + 1 >= params.hop_upper) {
break;
}
input.clear();
std::swap(input, output);
++depth;
}
ctx.set_with_reshuffle(params.alias, builder.finish(arena), shuffle_offset);
return ctx;
}
LOG(ERROR) << "not support path expand options";
RETURN_UNSUPPORTED_ERROR("not support path expand options");
}