static bl::result expand_edge_without_predicate_optional_impl()

in flex/engines/graph_db/runtime/common/operators/retrieve/edge_expand.cc [51:210]


static bl::result<Context> expand_edge_without_predicate_optional_impl(
    const GraphReadInterface& graph, Context&& ctx,
    const EdgeExpandParams& params) {
  std::vector<size_t> shuffle_offset;
  // has only one label
  if (params.labels.size() == 1) {
    // both direction and src_label == dst_label
    if (params.dir == Direction::kBoth &&
        params.labels[0].src_label == params.labels[0].dst_label) {
      auto& input_vertex_list =
          *std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
      if (input_vertex_list.is_optional()) {
        LOG(ERROR) << "not support optional vertex column as input currently";
        RETURN_UNSUPPORTED_ERROR(
            "not support optional vertex column as input currently");
      }
      auto& triplet = params.labels[0];
      auto props = graph.schema().get_edge_properties(
          triplet.src_label, triplet.dst_label, triplet.edge_label);
      PropertyType pt = PropertyType::kEmpty;
      if (!props.empty()) {
        pt = props[0];
      }
      if (props.size() > 1) {
        pt = PropertyType::kRecordView;
      }
      auto builder = BDSLEdgeColumnBuilder::optional_builder(triplet, pt);
      foreach_vertex(input_vertex_list, [&](size_t index, label_t label,
                                            vid_t v) {
        bool has_edge = false;
        if (label == triplet.src_label) {
          auto oe_iter = graph.GetOutEdgeIterator(label, v, triplet.dst_label,
                                                  triplet.edge_label);
          while (oe_iter.IsValid()) {
            auto nbr = oe_iter.GetNeighbor();
            builder.push_back_opt(v, nbr, oe_iter.GetData(), Direction::kOut);
            shuffle_offset.push_back(index);
            has_edge = true;
            oe_iter.Next();
          }
        }
        if (label == triplet.dst_label) {
          auto ie_iter = graph.GetInEdgeIterator(label, v, triplet.src_label,
                                                 triplet.edge_label);
          while (ie_iter.IsValid()) {
            auto nbr = ie_iter.GetNeighbor();
            builder.push_back_opt(nbr, v, ie_iter.GetData(), Direction::kIn);
            shuffle_offset.push_back(index);
            has_edge = true;
            ie_iter.Next();
          }
        }
        if (!has_edge) {
          builder.push_back_null();
          shuffle_offset.push_back(index);
        }
      });
      ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                             shuffle_offset);
      return ctx;
    } else if (params.dir == Direction::kOut) {
      auto& input_vertex_list =
          *std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
      if (input_vertex_list.is_optional()) {
        LOG(ERROR) << "not support optional vertex column as input currently";
        RETURN_UNSUPPORTED_ERROR(
            "not support optional vertex column as input currently");
      }
      auto& triplet = params.labels[0];
      auto props = graph.schema().get_edge_properties(
          triplet.src_label, triplet.dst_label, triplet.edge_label);
      PropertyType pt = PropertyType::kEmpty;
      if (!props.empty()) {
        pt = props[0];
      }
      if (props.size() > 1) {
        pt = PropertyType::kRecordView;
      }
      auto builder =
          SDSLEdgeColumnBuilder::optional_builder(Direction::kOut, triplet, pt);
      foreach_vertex(input_vertex_list,
                     [&](size_t index, label_t label, vid_t v) {
                       if (label == triplet.src_label) {
                         auto oe_iter = graph.GetOutEdgeIterator(
                             label, v, triplet.dst_label, triplet.edge_label);
                         bool has_edge = false;
                         while (oe_iter.IsValid()) {
                           auto nbr = oe_iter.GetNeighbor();
                           builder.push_back_opt(v, nbr, oe_iter.GetData());
                           shuffle_offset.push_back(index);
                           oe_iter.Next();
                           has_edge = true;
                         }
                         if (!has_edge) {
                           builder.push_back_null();
                           shuffle_offset.push_back(index);
                         }
                       } else {
                         builder.push_back_null();
                         shuffle_offset.push_back(index);
                       }
                     });

      ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                             shuffle_offset);
      return ctx;
    } else if (params.dir == Direction::kIn) {
      auto& input_vertex_list =
          *std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
      if (input_vertex_list.is_optional()) {
        LOG(ERROR) << "not support optional vertex column as input currently";
        RETURN_UNSUPPORTED_ERROR(
            "not support optional vertex column as input currently");
      }
      auto& triplet = params.labels[0];
      auto props = graph.schema().get_edge_properties(
          triplet.src_label, triplet.dst_label, triplet.edge_label);
      PropertyType pt = PropertyType::kEmpty;
      if (!props.empty()) {
        pt = props[0];
      }
      if (props.size() > 1) {
        pt = PropertyType::kRecordView;
      }
      auto builder =
          SDSLEdgeColumnBuilder::optional_builder(Direction::kIn, triplet, pt);
      foreach_vertex(input_vertex_list,
                     [&](size_t index, label_t label, vid_t v) {
                       if (label == triplet.dst_label) {
                         auto ie_iter = graph.GetInEdgeIterator(
                             label, v, triplet.src_label, triplet.edge_label);
                         bool has_edge = false;
                         while (ie_iter.IsValid()) {
                           auto nbr = ie_iter.GetNeighbor();
                           builder.push_back_opt(nbr, v, ie_iter.GetData());
                           shuffle_offset.push_back(index);
                           ie_iter.Next();
                           has_edge = true;
                         }
                         if (!has_edge) {
                           builder.push_back_null();
                           shuffle_offset.push_back(index);
                         }
                       } else {
                         builder.push_back_null();
                         shuffle_offset.push_back(index);
                       }
                     });

      ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
                             shuffle_offset);
      return ctx;
    }
  }
  LOG(ERROR) << "not support" << params.labels.size() << " "
             << (int) params.dir;
  RETURN_UNSUPPORTED_ERROR("not support" +
                           std::to_string(params.labels.size()) + " " +
                           std::to_string((int) params.dir));
}