in flex/engines/graph_db/runtime/common/operators/retrieve/edge_expand.h [40:339]
static bl::result<Context> expand_edge(const GraphReadInterface& graph,
Context&& ctx,
const EdgeExpandParams& params,
const PRED_T& pred) {
if (params.is_optional) {
LOG(ERROR) << "not support optional edge expand";
RETURN_UNSUPPORTED_ERROR("not support optional edge expand");
}
std::vector<size_t> shuffle_offset;
std::shared_ptr<IVertexColumn> input_vertex_list_ptr =
std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
VertexColumnType input_vertex_list_type =
input_vertex_list_ptr->vertex_column_type();
if (params.labels.size() == 1) {
if (input_vertex_list_type == VertexColumnType::kSingle) {
auto casted_input_vertex_list =
std::dynamic_pointer_cast<SLVertexColumn>(input_vertex_list_ptr);
auto pair =
expand_edge_impl<PRED_T>(graph, *casted_input_vertex_list,
params.labels[0], pred, params.dir);
if (pair.first != nullptr) {
ctx.set_with_reshuffle(params.alias, pair.first, pair.second);
return ctx;
}
}
LOG(INFO) << "not hit, fallback";
if (params.dir == Direction::kIn) {
auto& input_vertex_list = *input_vertex_list_ptr;
label_t output_vertex_label = params.labels[0].src_label;
label_t edge_label = params.labels[0].edge_label;
auto& props = graph.schema().get_edge_properties(
params.labels[0].src_label, params.labels[0].dst_label,
params.labels[0].edge_label);
PropertyType pt = PropertyType::kEmpty;
if (!props.empty()) {
pt = props[0];
}
if (props.size() > 1) {
pt = PropertyType::kRecordView;
}
auto builder = SDSLEdgeColumnBuilder::builder(Direction::kIn,
params.labels[0], pt);
foreach_vertex(input_vertex_list,
[&](size_t index, label_t label, vid_t v) {
auto ie_iter = graph.GetInEdgeIterator(
label, v, output_vertex_label, edge_label);
while (ie_iter.IsValid()) {
auto nbr = ie_iter.GetNeighbor();
if (pred(params.labels[0], nbr, v, ie_iter.GetData(),
Direction::kIn, index)) {
assert(ie_iter.GetData().type == pt);
builder.push_back_opt(nbr, v, ie_iter.GetData());
shuffle_offset.push_back(index);
}
ie_iter.Next();
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else if (params.dir == Direction::kOut) {
auto& input_vertex_list =
*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
label_t output_vertex_label = params.labels[0].dst_label;
label_t edge_label = params.labels[0].edge_label;
label_t src_label = params.labels[0].src_label;
auto& props = graph.schema().get_edge_properties(
params.labels[0].src_label, params.labels[0].dst_label,
params.labels[0].edge_label);
PropertyType pt = PropertyType::kEmpty;
if (!props.empty()) {
pt = props[0];
}
if (props.size() > 1) {
pt = PropertyType::kRecordView;
}
auto builder = SDSLEdgeColumnBuilder::builder(Direction::kOut,
params.labels[0], pt);
foreach_vertex(input_vertex_list,
[&](size_t index, label_t label, vid_t v) {
if (label != src_label) {
return;
}
auto oe_iter = graph.GetOutEdgeIterator(
label, v, output_vertex_label, edge_label);
while (oe_iter.IsValid()) {
auto nbr = oe_iter.GetNeighbor();
if (pred(params.labels[0], v, nbr, oe_iter.GetData(),
Direction::kOut, index)) {
assert(oe_iter.GetData().type == pt);
builder.push_back_opt(v, nbr, oe_iter.GetData());
shuffle_offset.push_back(index);
}
oe_iter.Next();
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else {
auto& input_vertex_list = *input_vertex_list_ptr;
auto& props = graph.schema().get_edge_properties(
params.labels[0].src_label, params.labels[0].dst_label,
params.labels[0].edge_label);
auto src_label = params.labels[0].src_label;
auto dst_label = params.labels[0].dst_label;
auto edge_label = params.labels[0].edge_label;
PropertyType pt = PropertyType::kEmpty;
if (!props.empty()) {
pt = props[0];
}
if (props.size() > 1) {
pt = PropertyType::kRecordView;
}
auto builder = BDSLEdgeColumnBuilder::builder(params.labels[0], pt);
foreach_vertex(
input_vertex_list, [&](size_t index, label_t label, vid_t v) {
if (label == src_label) {
auto oe_iter =
graph.GetOutEdgeIterator(label, v, dst_label, edge_label);
while (oe_iter.IsValid()) {
auto nbr = oe_iter.GetNeighbor();
if (pred(params.labels[0], v, nbr, oe_iter.GetData(),
Direction::kOut, index)) {
assert(oe_iter.GetData().type == pt);
builder.push_back_opt(v, nbr, oe_iter.GetData(),
Direction::kOut);
shuffle_offset.push_back(index);
}
oe_iter.Next();
}
}
if (label == dst_label) {
auto ie_iter =
graph.GetInEdgeIterator(label, v, src_label, edge_label);
while (ie_iter.IsValid()) {
auto nbr = ie_iter.GetNeighbor();
if (pred(params.labels[0], nbr, v, ie_iter.GetData(),
Direction::kIn, index)) {
assert(ie_iter.GetData().type == pt);
builder.push_back_opt(nbr, v, ie_iter.GetData(),
Direction::kIn);
shuffle_offset.push_back(index);
}
ie_iter.Next();
}
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
}
} else {
LOG(INFO) << "not hit, fallback";
if (params.dir == Direction::kBoth) {
auto& input_vertex_list =
*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
std::vector<std::pair<LabelTriplet, PropertyType>> label_props;
for (auto& triplet : params.labels) {
auto& props = graph.schema().get_edge_properties(
triplet.src_label, triplet.dst_label, triplet.edge_label);
PropertyType pt = PropertyType::kEmpty;
if (!props.empty()) {
pt = props[0];
}
label_props.emplace_back(triplet, pt);
}
auto builder = BDMLEdgeColumnBuilder::builder(label_props);
foreach_vertex(
input_vertex_list, [&](size_t index, label_t label, vid_t v) {
for (auto& label_prop : label_props) {
auto& triplet = label_prop.first;
if (label == triplet.src_label) {
auto oe_iter = graph.GetOutEdgeIterator(
label, v, triplet.dst_label, triplet.edge_label);
while (oe_iter.IsValid()) {
auto nbr = oe_iter.GetNeighbor();
if (pred(triplet, v, nbr, oe_iter.GetData(),
Direction::kOut, index)) {
assert(oe_iter.GetData().type == label_prop.second);
builder.push_back_opt(triplet, v, nbr, oe_iter.GetData(),
Direction::kOut);
shuffle_offset.push_back(index);
}
oe_iter.Next();
}
}
if (label == triplet.dst_label) {
auto ie_iter = graph.GetInEdgeIterator(
label, v, triplet.src_label, triplet.edge_label);
while (ie_iter.IsValid()) {
auto nbr = ie_iter.GetNeighbor();
if (pred(triplet, nbr, v, ie_iter.GetData(), Direction::kIn,
index)) {
assert(ie_iter.GetData().type == label_prop.second);
builder.push_back_opt(triplet, nbr, v, ie_iter.GetData(),
Direction::kIn);
shuffle_offset.push_back(index);
}
ie_iter.Next();
}
}
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else if (params.dir == Direction::kOut) {
auto& input_vertex_list =
*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
std::vector<std::pair<LabelTriplet, PropertyType>> label_props;
for (auto& triplet : params.labels) {
auto& props = graph.schema().get_edge_properties(
triplet.src_label, triplet.dst_label, triplet.edge_label);
PropertyType pt = PropertyType::kEmpty;
if (!props.empty()) {
pt = props[0];
}
label_props.emplace_back(triplet, pt);
}
auto builder =
SDMLEdgeColumnBuilder::builder(Direction::kOut, label_props);
foreach_vertex(
input_vertex_list, [&](size_t index, label_t label, vid_t v) {
for (auto& label_prop : label_props) {
auto& triplet = label_prop.first;
if (label != triplet.src_label)
continue;
auto oe_iter = graph.GetOutEdgeIterator(
label, v, triplet.dst_label, triplet.edge_label);
while (oe_iter.IsValid()) {
auto nbr = oe_iter.GetNeighbor();
if (pred(triplet, v, nbr, oe_iter.GetData(), Direction::kOut,
index)) {
assert(oe_iter.GetData().type == label_prop.second);
builder.push_back_opt(triplet, v, nbr, oe_iter.GetData());
shuffle_offset.push_back(index);
}
oe_iter.Next();
}
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else if (params.dir == Direction::kIn) {
auto& input_vertex_list =
*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(params.v_tag));
std::vector<std::pair<LabelTriplet, PropertyType>> label_props;
for (auto& triplet : params.labels) {
auto& props = graph.schema().get_edge_properties(
triplet.src_label, triplet.dst_label, triplet.edge_label);
PropertyType pt = PropertyType::kEmpty;
if (!props.empty()) {
pt = props[0];
}
label_props.emplace_back(triplet, pt);
}
auto builder =
SDMLEdgeColumnBuilder::builder(Direction::kIn, label_props);
foreach_vertex(
input_vertex_list, [&](size_t index, label_t label, vid_t v) {
for (auto& label_prop : label_props) {
auto& triplet = label_prop.first;
if (label != triplet.dst_label)
continue;
auto ie_iter = graph.GetInEdgeIterator(
label, v, triplet.src_label, triplet.edge_label);
while (ie_iter.IsValid()) {
auto nbr = ie_iter.GetNeighbor();
if (pred(triplet, nbr, v, ie_iter.GetData(), Direction::kIn,
index)) {
assert(ie_iter.GetData().type == label_prop.second);
builder.push_back_opt(triplet, nbr, v, ie_iter.GetData());
shuffle_offset.push_back(index);
}
ie_iter.Next();
}
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
}
}
LOG(ERROR) << "expand edge not support";
RETURN_UNSUPPORTED_ERROR("expand edge not support");
}