bl::result PathExpand::edge_expand_v()

in flex/engines/graph_db/runtime/common/operators/retrieve/path_expand.cc [24:229]


bl::result<Context> PathExpand::edge_expand_v(const GraphReadInterface& graph,
                                              Context&& ctx,
                                              const PathExpandParams& params) {
  std::vector<size_t> shuffle_offset;
  if (params.labels.size() == 1 &&
      ctx.get(params.start_tag)->column_type() == ContextColumnType::kVertex &&
      std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.start_tag))
              ->vertex_column_type() == VertexColumnType::kSingle) {
    auto& input_vertex_list =
        *std::dynamic_pointer_cast<SLVertexColumn>(ctx.get(params.start_tag));
    auto pair = path_expand_vertex_without_predicate_impl(
        graph, input_vertex_list, params.labels, params.dir, params.hop_lower,
        params.hop_upper);
    ctx.set_with_reshuffle(params.alias, pair.first, pair.second);
    return ctx;
  } else {
    if (params.dir == Direction::kOut) {
      auto& input_vertex_list =
          *std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.start_tag));
      std::set<label_t> labels;
      std::vector<std::vector<LabelTriplet>> out_labels_map(
          graph.schema().vertex_label_num());
      for (const auto& label : params.labels) {
        labels.emplace(label.dst_label);
        out_labels_map[label.src_label].emplace_back(label);
      }

      auto builder = MLVertexColumnBuilder::builder(labels);
      std::vector<std::tuple<label_t, vid_t, size_t>> input;
      std::vector<std::tuple<label_t, vid_t, size_t>> output;
      foreach_vertex(input_vertex_list,
                     [&](size_t index, label_t label, vid_t v) {
                       output.emplace_back(label, v, index);
                     });
      int depth = 0;
      while (depth < params.hop_upper && (!output.empty())) {
        input.clear();
        std::swap(input, output);
        if (depth >= params.hop_lower) {
          for (auto& tuple : input) {
            builder.push_back_vertex({std::get<0>(tuple), std::get<1>(tuple)});
            shuffle_offset.push_back(std::get<2>(tuple));
          }
        }

        if (depth + 1 >= params.hop_upper) {
          break;
        }

        for (auto& tuple : input) {
          auto label = std::get<0>(tuple);
          auto v = std::get<1>(tuple);
          auto index = std::get<2>(tuple);
          for (const auto& label_triplet : out_labels_map[label]) {
            auto oe_iter = graph.GetOutEdgeIterator(label_triplet.src_label, v,
                                                    label_triplet.dst_label,
                                                    label_triplet.edge_label);

            while (oe_iter.IsValid()) {
              auto nbr = oe_iter.GetNeighbor();
              output.emplace_back(label_triplet.dst_label, nbr, index);
              oe_iter.Next();
            }
          }
        }
        ++depth;
      }
      ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                             shuffle_offset);
      return ctx;
    } else if (params.dir == Direction::kIn) {
      auto& input_vertex_list =
          *std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.start_tag));
      std::set<label_t> labels;
      std::vector<std::vector<LabelTriplet>> in_labels_map(
          graph.schema().vertex_label_num());
      for (auto& label : params.labels) {
        labels.emplace(label.src_label);
        in_labels_map[label.dst_label].emplace_back(label);
      }

      auto builder = MLVertexColumnBuilder::builder(labels);
      std::vector<std::tuple<label_t, vid_t, size_t>> input;
      std::vector<std::tuple<label_t, vid_t, size_t>> output;
      foreach_vertex(input_vertex_list,
                     [&](size_t index, label_t label, vid_t v) {
                       output.emplace_back(label, v, index);
                     });
      int depth = 0;
      while (depth < params.hop_upper && (!output.empty())) {
        input.clear();
        std::swap(input, output);
        if (depth >= params.hop_lower) {
          for (const auto& tuple : input) {
            builder.push_back_vertex({std::get<0>(tuple), std::get<1>(tuple)});
            shuffle_offset.push_back(std::get<2>(tuple));
          }
        }

        if (depth + 1 >= params.hop_upper) {
          break;
        }

        for (const auto& tuple : input) {
          auto label = std::get<0>(tuple);
          auto v = std::get<1>(tuple);
          auto index = std::get<2>(tuple);
          for (const auto& label_triplet : in_labels_map[label]) {
            auto oe_iter = graph.GetInEdgeIterator(label_triplet.dst_label, v,
                                                   label_triplet.src_label,
                                                   label_triplet.edge_label);

            while (oe_iter.IsValid()) {
              auto nbr = oe_iter.GetNeighbor();
              output.emplace_back(label_triplet.src_label, nbr, index);
              oe_iter.Next();
            }
          }
        }
        ++depth;
      }
      ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                             shuffle_offset);
      return ctx;
    } else if (params.dir == Direction::kBoth) {
      std::set<label_t> labels;
      std::vector<std::vector<LabelTriplet>> in_labels_map(
          graph.schema().vertex_label_num()),
          out_labels_map(graph.schema().vertex_label_num());
      for (const auto& label : params.labels) {
        labels.emplace(label.dst_label);
        in_labels_map[label.dst_label].emplace_back(label);
        out_labels_map[label.src_label].emplace_back(label);
      }

      auto builder = MLVertexColumnBuilder::builder(labels);
      std::vector<std::tuple<label_t, vid_t, size_t>> input;
      std::vector<std::tuple<label_t, vid_t, size_t>> output;
      auto input_vertex_list =
          std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.start_tag));
      if (input_vertex_list->vertex_column_type() ==
          VertexColumnType::kMultiple) {
        auto& input_vertex_list = *std::dynamic_pointer_cast<MLVertexColumn>(
            ctx.get(params.start_tag));

        input_vertex_list.foreach_vertex(
            [&](size_t index, label_t label, vid_t v) {
              output.emplace_back(label, v, index);
            });
      } else {
        foreach_vertex(*input_vertex_list,
                       [&](size_t index, label_t label, vid_t v) {
                         output.emplace_back(label, v, index);
                       });
      }
      int depth = 0;
      while (depth < params.hop_upper && (!output.empty())) {
        input.clear();
        std::swap(input, output);
        if (depth >= params.hop_lower) {
          for (auto& tuple : input) {
            builder.push_back_vertex({std::get<0>(tuple), std::get<1>(tuple)});
            shuffle_offset.push_back(std::get<2>(tuple));
          }
        }

        if (depth + 1 >= params.hop_upper) {
          break;
        }

        for (auto& tuple : input) {
          auto label = std::get<0>(tuple);
          auto v = std::get<1>(tuple);
          auto index = std::get<2>(tuple);
          for (const auto& label_triplet : out_labels_map[label]) {
            auto oe_iter = graph.GetOutEdgeIterator(label_triplet.src_label, v,
                                                    label_triplet.dst_label,
                                                    label_triplet.edge_label);

            while (oe_iter.IsValid()) {
              auto nbr = oe_iter.GetNeighbor();
              output.emplace_back(label_triplet.dst_label, nbr, index);
              oe_iter.Next();
            }
          }
          for (const auto& label_triplet : in_labels_map[label]) {
            auto ie_iter = graph.GetInEdgeIterator(label_triplet.dst_label, v,
                                                   label_triplet.src_label,
                                                   label_triplet.edge_label);
            while (ie_iter.IsValid()) {
              auto nbr = ie_iter.GetNeighbor();
              output.emplace_back(label_triplet.src_label, nbr, index);
              ie_iter.Next();
            }
          }
        }
        depth++;
      }
      ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                             shuffle_offset);
      return ctx;
    }
  }
  LOG(ERROR) << "not support path expand options";
  RETURN_UNSUPPORTED_ERROR("not support path expand options");
}