static bl::result get_vertex_from_edges()

in flex/engines/graph_db/runtime/common/operators/retrieve/get_v.h [140:450]


  static bl::result<Context> get_vertex_from_edges(
      const GraphReadInterface& graph, Context&& ctx, const GetVParams& params,
      const PRED_T& pred) {
    std::vector<size_t> shuffle_offset;
    auto col = ctx.get(params.tag);
    if (col->column_type() == ContextColumnType::kPath) {
      auto& input_path_list =
          *std::dynamic_pointer_cast<GeneralPathColumn>(col);

      auto builder = MLVertexColumnBuilder::builder();
      input_path_list.foreach_path([&](size_t index, const Path& path) {
        auto [label, vid] = path.get_end();
        builder.push_back_vertex({label, vid});
        shuffle_offset.push_back(index);
      });
      ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                             shuffle_offset);
      return ctx;
    }
    auto column = std::dynamic_pointer_cast<IEdgeColumn>(ctx.get(params.tag));
    if (!column) {
      LOG(ERROR) << "Unsupported column type: "
                 << static_cast<int>(col->column_type());
      RETURN_UNSUPPORTED_ERROR(
          "Unsupported column type: " +
          std::to_string(static_cast<int>(col->column_type())));
    }

    if (column->is_optional()) {
      return get_vertex_from_edges_optional_impl(graph, std::move(ctx), params,
                                                 pred);
    }

    if (column->edge_column_type() == EdgeColumnType::kSDSL) {
      auto& input_edge_list =
          *std::dynamic_pointer_cast<SDSLEdgeColumn>(column);
      label_t output_vertex_label{0};
      auto edge_label = input_edge_list.get_labels()[0];

      VOpt opt = params.opt;
      if (params.opt == VOpt::kOther) {
        if (input_edge_list.dir() == Direction::kOut) {
          opt = VOpt::kEnd;
        } else {
          opt = VOpt::kStart;
        }
      }
      if (opt == VOpt::kStart) {
        output_vertex_label = edge_label.src_label;
      } else if (opt == VOpt::kEnd) {
        output_vertex_label = edge_label.dst_label;
      } else {
        LOG(ERROR) << "not support GetV opt " << static_cast<int>(opt);
        RETURN_UNSUPPORTED_ERROR("not support GetV opt " +
                                 std::to_string(static_cast<int>(opt)));
      }
      // params tables size may be 0
      if (params.tables.size() == 1) {
        if (output_vertex_label != params.tables[0]) {
          LOG(ERROR) << "output_vertex_label != params.tables[0]"
                     << static_cast<int>(output_vertex_label) << " "
                     << static_cast<int>(params.tables[0]);
          RETURN_BAD_REQUEST_ERROR("output_vertex_label != params.tables[0]");
        }
      }
      auto builder = SLVertexColumnBuilder::builder(output_vertex_label);
      if (opt == VOpt::kStart) {
        input_edge_list.foreach_edge(
            [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                const EdgeData& edata, Direction dir) {
              if (pred(label.src_label, src, index)) {
                builder.push_back_opt(src);
                shuffle_offset.push_back(index);
              }
            });
      } else if (opt == VOpt::kEnd) {
        input_edge_list.foreach_edge(
            [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                const EdgeData& edata, Direction dir) {
              if (pred(label.dst_label, dst, index)) {
                builder.push_back_opt(dst);
                shuffle_offset.push_back(index);
              }
            });
      }
      ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                             shuffle_offset);
      return ctx;
    } else if (column->edge_column_type() == EdgeColumnType::kSDML) {
      auto& input_edge_list =
          *std::dynamic_pointer_cast<SDMLEdgeColumn>(column);
      VOpt opt = params.opt;
      if (params.opt == VOpt::kOther) {
        if (input_edge_list.dir() == Direction::kOut) {
          opt = VOpt::kEnd;
        } else {
          opt = VOpt::kStart;
        }
      }

      auto labels =
          extract_labels(input_edge_list.get_labels(), params.tables, opt);
      if (labels.size() == 0) {
        auto builder = MLVertexColumnBuilder::builder();
        ctx.set_with_reshuffle(params.alias, builder.finish(nullptr), {});
        return ctx;
      }
      if (labels.size() > 1) {
        auto builder = MLVertexColumnBuilder::builder();
        if (opt == VOpt::kStart) {
          input_edge_list.foreach_edge(
              [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                  const EdgeData& edata, Direction dir) {
                if (std::find(labels.begin(), labels.end(), label.src_label) !=
                    labels.end()) {
                  builder.push_back_vertex({label.src_label, src});
                  shuffle_offset.push_back(index);
                }
              });
        } else if (opt == VOpt::kEnd) {
          input_edge_list.foreach_edge(
              [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                  const EdgeData& edata, Direction dir) {
                if (std::find(labels.begin(), labels.end(), label.dst_label) !=
                    labels.end()) {
                  builder.push_back_vertex({label.dst_label, dst});
                  shuffle_offset.push_back(index);
                }
              });
        }
        ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                               shuffle_offset);
        return ctx;
      }
    } else if (column->edge_column_type() == EdgeColumnType::kBDSL) {
      auto& input_edge_list =
          *std::dynamic_pointer_cast<BDSLEdgeColumn>(column);
      if (params.tables.size() == 0) {
        auto type = input_edge_list.get_labels()[0];
        if (type.src_label != type.dst_label) {
          auto builder = MLVertexColumnBuilder::builder();
          if (params.opt != VOpt::kOther) {
            LOG(ERROR) << "not support GetV opt "
                       << static_cast<int>(params.opt);
            RETURN_UNSUPPORTED_ERROR(
                "not support GetV opt " +
                std::to_string(static_cast<int>(params.opt)));
          }
          input_edge_list.foreach_edge(
              [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                  const EdgeData& edata, Direction dir) {
                if (dir == Direction::kOut) {
                  builder.push_back_vertex({label.dst_label, dst});
                } else {
                  builder.push_back_vertex({label.src_label, src});
                }
                shuffle_offset.push_back(index);
              });
          ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                                 shuffle_offset);
          return ctx;
        } else {
          auto builder = SLVertexColumnBuilder::builder(type.src_label);
          input_edge_list.foreach_edge(
              [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                  const EdgeData& edata, Direction dir) {
                if (dir == Direction::kOut) {
                  builder.push_back_opt(dst);
                  shuffle_offset.push_back(index);
                } else {
                  builder.push_back_opt(src);
                  shuffle_offset.push_back(index);
                }
              });
          ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                                 shuffle_offset);
          return ctx;
        }
      } else {
        std::vector<label_t> labels;
        auto type = input_edge_list.get_labels()[0];
        for (auto& label : params.tables) {
          if (label == type.src_label || label == type.dst_label) {
            labels.push_back(label);
          }
        }
        if (labels.size() == 1) {
          auto builder = SLVertexColumnBuilder::builder(labels[0]);
          input_edge_list.foreach_edge(
              [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                  const EdgeData& edata, Direction dir) {
                if (dir == Direction::kOut) {
                  if (label.dst_label == labels[0]) {
                    builder.push_back_opt(dst);
                    shuffle_offset.push_back(index);
                  }
                } else {
                  if (label.src_label == labels[0]) {
                    builder.push_back_opt(src);
                    shuffle_offset.push_back(index);
                  }
                }
              });
          ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                                 shuffle_offset);
          return ctx;
        } else {
          auto builder = MLVertexColumnBuilder::builder();
          input_edge_list.foreach_edge(
              [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                  const EdgeData& edata, Direction dir) {
                if (dir == Direction::kOut) {
                  if (std::find(labels.begin(), labels.end(),
                                label.dst_label) != labels.end()) {
                    builder.push_back_vertex({label.dst_label, dst});
                    shuffle_offset.push_back(index);
                  }
                } else {
                  if (std::find(labels.begin(), labels.end(),
                                label.src_label) != labels.end()) {
                    builder.push_back_vertex({label.src_label, src});
                    shuffle_offset.push_back(index);
                  }
                }
              });
          ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                                 shuffle_offset);
          return ctx;
        }
      }
    } else if (column->edge_column_type() == EdgeColumnType::kBDML) {
      auto& input_edge_list =
          *std::dynamic_pointer_cast<BDMLEdgeColumn>(column);
      if (params.tables.size() == 0) {
        auto builder = MLVertexColumnBuilder::builder();
        if (params.opt != VOpt::kOther) {
          LOG(ERROR) << "not support GetV opt " << static_cast<int>(params.opt);
          RETURN_UNSUPPORTED_ERROR(
              "not support GetV opt " +
              std::to_string(static_cast<int>(params.opt)));
        }
        input_edge_list.foreach_edge(
            [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                const EdgeData& edata, Direction dir) {
              if (dir == Direction::kOut) {
                builder.push_back_vertex({label.dst_label, dst});
              } else {
                builder.push_back_vertex({label.src_label, src});
              }
              shuffle_offset.push_back(index);
            });

        ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                               shuffle_offset);
        return ctx;
      } else {
        if (params.tables.size() == 1) {
          auto vlabel = params.tables[0];
          auto builder = SLVertexColumnBuilder::builder(vlabel);
          input_edge_list.foreach_edge(
              [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                  const EdgeData& edata, Direction dir) {
                if (dir == Direction::kOut) {
                  if (label.dst_label == vlabel) {
                    builder.push_back_opt(dst);
                    shuffle_offset.push_back(index);
                  }
                } else {
                  if (label.src_label == vlabel) {
                    builder.push_back_opt(src);
                    shuffle_offset.push_back(index);
                  }
                }
              });
          ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                                 shuffle_offset);

        } else {
          std::vector<bool> labels(graph.schema().vertex_label_num(), false);
          for (auto& label : params.tables) {
            labels[label] = true;
          }
          auto builder = MLVertexColumnBuilder::builder();
          input_edge_list.foreach_edge(
              [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
                  const EdgeData& edata, Direction dir) {
                if (dir == Direction::kOut) {
                  if (labels[label.dst_label]) {
                    builder.push_back_vertex({label.dst_label, dst});
                    shuffle_offset.push_back(index);
                  }
                } else {
                  if (labels[label.src_label]) {
                    builder.push_back_vertex({label.src_label, src});
                    shuffle_offset.push_back(index);
                  }
                }
              });
          ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                                 shuffle_offset);
        }
        return ctx;
      }
    }

    LOG(ERROR) << "Unsupported edge column type: "
               << static_cast<int>(column->edge_column_type());
    RETURN_UNSUPPORTED_ERROR(
        "Unsupported edge column type: " +
        std::to_string(static_cast<int>(column->edge_column_type())));
  }