bl::result PathExpand::edge_expand_p()

in flex/engines/graph_db/runtime/common/operators/retrieve/path_expand.cc [231:404]


bl::result<Context> PathExpand::edge_expand_p(const GraphReadInterface& graph,
                                              Context&& ctx,
                                              const PathExpandParams& params) {
  std::vector<size_t> shuffle_offset;
  auto& input_vertex_list =
      *std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.start_tag));
  auto label_sets = input_vertex_list.get_labels_set();
  auto labels = params.labels;
  std::vector<std::vector<LabelTriplet>> out_labels_map(
      graph.schema().vertex_label_num()),
      in_labels_map(graph.schema().vertex_label_num());
  for (const auto& triplet : labels) {
    out_labels_map[triplet.src_label].emplace_back(triplet);
    in_labels_map[triplet.dst_label].emplace_back(triplet);
  }
  auto dir = params.dir;
  std::vector<std::pair<std::unique_ptr<PathImpl>, size_t>> input;
  std::vector<std::pair<std::unique_ptr<PathImpl>, size_t>> output;

  GeneralPathColumnBuilder builder;
  std::shared_ptr<Arena> arena = std::make_shared<Arena>();
  if (dir == Direction::kOut) {
    foreach_vertex(input_vertex_list,
                   [&](size_t index, label_t label, vid_t v) {
                     auto p = PathImpl::make_path_impl(label, v);
                     input.emplace_back(std::move(p), index);
                   });
    int depth = 0;
    while (depth < params.hop_upper) {
      output.clear();
      if (depth + 1 < params.hop_upper) {
        for (auto& [path, index] : input) {
          auto end = path->get_end();
          for (const auto& label_triplet : out_labels_map[end.label_]) {
            auto oe_iter = graph.GetOutEdgeIterator(end.label_, end.vid_,
                                                    label_triplet.dst_label,
                                                    label_triplet.edge_label);
            while (oe_iter.IsValid()) {
              std::unique_ptr<PathImpl> new_path =
                  path->expand(label_triplet.edge_label,
                               label_triplet.dst_label, oe_iter.GetNeighbor());
              output.emplace_back(std::move(new_path), index);
              oe_iter.Next();
            }
          }
        }
      }

      if (depth >= params.hop_lower) {
        for (auto& [path, index] : input) {
          builder.push_back_opt(Path(path.get()));
          arena->emplace_back(std::move(path));
          shuffle_offset.push_back(index);
        }
      }
      if (depth + 1 >= params.hop_upper) {
        break;
      }

      input.clear();
      std::swap(input, output);
      ++depth;
    }
    ctx.set_with_reshuffle(params.alias, builder.finish(arena), shuffle_offset);

    return ctx;
  } else if (dir == Direction::kIn) {
    foreach_vertex(input_vertex_list,
                   [&](size_t index, label_t label, vid_t v) {
                     auto p = PathImpl::make_path_impl(label, v);
                     input.emplace_back(std::move(p), index);
                   });
    int depth = 0;
    while (depth < params.hop_upper) {
      output.clear();

      if (depth + 1 < params.hop_upper) {
        for (const auto& [path, index] : input) {
          auto end = path->get_end();
          for (const auto& label_triplet : in_labels_map[end.label_]) {
            auto ie_iter = graph.GetInEdgeIterator(end.label_, end.vid_,
                                                   label_triplet.src_label,
                                                   label_triplet.edge_label);
            while (ie_iter.IsValid()) {
              std::unique_ptr<PathImpl> new_path =
                  path->expand(label_triplet.edge_label,
                               label_triplet.src_label, ie_iter.GetNeighbor());
              output.emplace_back(std::move(new_path), index);
              ie_iter.Next();
            }
          }
        }
      }

      if (depth >= params.hop_lower) {
        for (auto& [path, index] : input) {
          builder.push_back_opt(Path(path.get()));
          arena->emplace_back(std::move(path));
          shuffle_offset.push_back(index);
        }
      }
      if (depth + 1 >= params.hop_upper) {
        break;
      }

      input.clear();
      std::swap(input, output);
      ++depth;
    }
    ctx.set_with_reshuffle(params.alias, builder.finish(arena), shuffle_offset);

    return ctx;

  } else if (dir == Direction::kBoth) {
    foreach_vertex(input_vertex_list,
                   [&](size_t index, label_t label, vid_t v) {
                     auto p = PathImpl::make_path_impl(label, v);
                     input.emplace_back(std::move(p), index);
                   });
    int depth = 0;
    while (depth < params.hop_upper) {
      output.clear();
      if (depth + 1 < params.hop_upper) {
        for (auto& [path, index] : input) {
          auto end = path->get_end();
          for (const auto& label_triplet : out_labels_map[end.label_]) {
            auto oe_iter = graph.GetOutEdgeIterator(end.label_, end.vid_,
                                                    label_triplet.dst_label,
                                                    label_triplet.edge_label);
            while (oe_iter.IsValid()) {
              auto new_path =
                  path->expand(label_triplet.edge_label,
                               label_triplet.dst_label, oe_iter.GetNeighbor());
              output.emplace_back(std::move(new_path), index);
              oe_iter.Next();
            }
          }

          for (const auto& label_triplet : in_labels_map[end.label_]) {
            auto ie_iter = graph.GetInEdgeIterator(end.label_, end.vid_,
                                                   label_triplet.src_label,
                                                   label_triplet.edge_label);
            while (ie_iter.IsValid()) {
              auto new_path =
                  path->expand(label_triplet.edge_label,
                               label_triplet.src_label, ie_iter.GetNeighbor());
              output.emplace_back(std::move(new_path), index);
              ie_iter.Next();
            }
          }
        }
      }

      if (depth >= params.hop_lower) {
        for (auto& [path, index] : input) {
          builder.push_back_opt(Path(path.get()));
          arena->emplace_back(std::move(path));
          shuffle_offset.push_back(index);
        }
      }
      if (depth + 1 >= params.hop_upper) {
        break;
      }

      input.clear();
      std::swap(input, output);
      ++depth;
    }
    ctx.set_with_reshuffle(params.alias, builder.finish(arena), shuffle_offset);
    return ctx;
  }
  LOG(ERROR) << "not support path expand options";
  RETURN_UNSUPPORTED_ERROR("not support path expand options");
}