in flex/engines/graph_db/runtime/common/operators/retrieve/edge_expand.cc [51:210]
static bl::result<Context> expand_edge_without_predicate_optional_impl(
const GraphReadInterface& graph, Context&& ctx,
const EdgeExpandParams& params) {
std::vector<size_t> shuffle_offset;
// has only one label
if (params.labels.size() == 1) {
// both direction and src_label == dst_label
if (params.dir == Direction::kBoth &&
params.labels[0].src_label == params.labels[0].dst_label) {
auto& input_vertex_list =
*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
if (input_vertex_list.is_optional()) {
LOG(ERROR) << "not support optional vertex column as input currently";
RETURN_UNSUPPORTED_ERROR(
"not support optional vertex column as input currently");
}
auto& triplet = params.labels[0];
auto props = graph.schema().get_edge_properties(
triplet.src_label, triplet.dst_label, triplet.edge_label);
PropertyType pt = PropertyType::kEmpty;
if (!props.empty()) {
pt = props[0];
}
if (props.size() > 1) {
pt = PropertyType::kRecordView;
}
auto builder = BDSLEdgeColumnBuilder::optional_builder(triplet, pt);
foreach_vertex(input_vertex_list, [&](size_t index, label_t label,
vid_t v) {
bool has_edge = false;
if (label == triplet.src_label) {
auto oe_iter = graph.GetOutEdgeIterator(label, v, triplet.dst_label,
triplet.edge_label);
while (oe_iter.IsValid()) {
auto nbr = oe_iter.GetNeighbor();
builder.push_back_opt(v, nbr, oe_iter.GetData(), Direction::kOut);
shuffle_offset.push_back(index);
has_edge = true;
oe_iter.Next();
}
}
if (label == triplet.dst_label) {
auto ie_iter = graph.GetInEdgeIterator(label, v, triplet.src_label,
triplet.edge_label);
while (ie_iter.IsValid()) {
auto nbr = ie_iter.GetNeighbor();
builder.push_back_opt(nbr, v, ie_iter.GetData(), Direction::kIn);
shuffle_offset.push_back(index);
has_edge = true;
ie_iter.Next();
}
}
if (!has_edge) {
builder.push_back_null();
shuffle_offset.push_back(index);
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else if (params.dir == Direction::kOut) {
auto& input_vertex_list =
*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
if (input_vertex_list.is_optional()) {
LOG(ERROR) << "not support optional vertex column as input currently";
RETURN_UNSUPPORTED_ERROR(
"not support optional vertex column as input currently");
}
auto& triplet = params.labels[0];
auto props = graph.schema().get_edge_properties(
triplet.src_label, triplet.dst_label, triplet.edge_label);
PropertyType pt = PropertyType::kEmpty;
if (!props.empty()) {
pt = props[0];
}
if (props.size() > 1) {
pt = PropertyType::kRecordView;
}
auto builder =
SDSLEdgeColumnBuilder::optional_builder(Direction::kOut, triplet, pt);
foreach_vertex(input_vertex_list,
[&](size_t index, label_t label, vid_t v) {
if (label == triplet.src_label) {
auto oe_iter = graph.GetOutEdgeIterator(
label, v, triplet.dst_label, triplet.edge_label);
bool has_edge = false;
while (oe_iter.IsValid()) {
auto nbr = oe_iter.GetNeighbor();
builder.push_back_opt(v, nbr, oe_iter.GetData());
shuffle_offset.push_back(index);
oe_iter.Next();
has_edge = true;
}
if (!has_edge) {
builder.push_back_null();
shuffle_offset.push_back(index);
}
} else {
builder.push_back_null();
shuffle_offset.push_back(index);
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else if (params.dir == Direction::kIn) {
auto& input_vertex_list =
*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
if (input_vertex_list.is_optional()) {
LOG(ERROR) << "not support optional vertex column as input currently";
RETURN_UNSUPPORTED_ERROR(
"not support optional vertex column as input currently");
}
auto& triplet = params.labels[0];
auto props = graph.schema().get_edge_properties(
triplet.src_label, triplet.dst_label, triplet.edge_label);
PropertyType pt = PropertyType::kEmpty;
if (!props.empty()) {
pt = props[0];
}
if (props.size() > 1) {
pt = PropertyType::kRecordView;
}
auto builder =
SDSLEdgeColumnBuilder::optional_builder(Direction::kIn, triplet, pt);
foreach_vertex(input_vertex_list,
[&](size_t index, label_t label, vid_t v) {
if (label == triplet.dst_label) {
auto ie_iter = graph.GetInEdgeIterator(
label, v, triplet.src_label, triplet.edge_label);
bool has_edge = false;
while (ie_iter.IsValid()) {
auto nbr = ie_iter.GetNeighbor();
builder.push_back_opt(nbr, v, ie_iter.GetData());
shuffle_offset.push_back(index);
ie_iter.Next();
has_edge = true;
}
if (!has_edge) {
builder.push_back_null();
shuffle_offset.push_back(index);
}
} else {
builder.push_back_null();
shuffle_offset.push_back(index);
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
}
}
LOG(ERROR) << "not support" << params.labels.size() << " "
<< (int) params.dir;
RETURN_UNSUPPORTED_ERROR("not support" +
std::to_string(params.labels.size()) + " " +
std::to_string((int) params.dir));
}