in flex/engines/graph_db/runtime/common/operators/retrieve/get_v.h [140:450]
static bl::result<Context> get_vertex_from_edges(
const GraphReadInterface& graph, Context&& ctx, const GetVParams& params,
const PRED_T& pred) {
std::vector<size_t> shuffle_offset;
auto col = ctx.get(params.tag);
if (col->column_type() == ContextColumnType::kPath) {
auto& input_path_list =
*std::dynamic_pointer_cast<GeneralPathColumn>(col);
auto builder = MLVertexColumnBuilder::builder();
input_path_list.foreach_path([&](size_t index, const Path& path) {
auto [label, vid] = path.get_end();
builder.push_back_vertex({label, vid});
shuffle_offset.push_back(index);
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
}
auto column = std::dynamic_pointer_cast<IEdgeColumn>(ctx.get(params.tag));
if (!column) {
LOG(ERROR) << "Unsupported column type: "
<< static_cast<int>(col->column_type());
RETURN_UNSUPPORTED_ERROR(
"Unsupported column type: " +
std::to_string(static_cast<int>(col->column_type())));
}
if (column->is_optional()) {
return get_vertex_from_edges_optional_impl(graph, std::move(ctx), params,
pred);
}
if (column->edge_column_type() == EdgeColumnType::kSDSL) {
auto& input_edge_list =
*std::dynamic_pointer_cast<SDSLEdgeColumn>(column);
label_t output_vertex_label{0};
auto edge_label = input_edge_list.get_labels()[0];
VOpt opt = params.opt;
if (params.opt == VOpt::kOther) {
if (input_edge_list.dir() == Direction::kOut) {
opt = VOpt::kEnd;
} else {
opt = VOpt::kStart;
}
}
if (opt == VOpt::kStart) {
output_vertex_label = edge_label.src_label;
} else if (opt == VOpt::kEnd) {
output_vertex_label = edge_label.dst_label;
} else {
LOG(ERROR) << "not support GetV opt " << static_cast<int>(opt);
RETURN_UNSUPPORTED_ERROR("not support GetV opt " +
std::to_string(static_cast<int>(opt)));
}
// params tables size may be 0
if (params.tables.size() == 1) {
if (output_vertex_label != params.tables[0]) {
LOG(ERROR) << "output_vertex_label != params.tables[0]"
<< static_cast<int>(output_vertex_label) << " "
<< static_cast<int>(params.tables[0]);
RETURN_BAD_REQUEST_ERROR("output_vertex_label != params.tables[0]");
}
}
auto builder = SLVertexColumnBuilder::builder(output_vertex_label);
if (opt == VOpt::kStart) {
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (pred(label.src_label, src, index)) {
builder.push_back_opt(src);
shuffle_offset.push_back(index);
}
});
} else if (opt == VOpt::kEnd) {
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (pred(label.dst_label, dst, index)) {
builder.push_back_opt(dst);
shuffle_offset.push_back(index);
}
});
}
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else if (column->edge_column_type() == EdgeColumnType::kSDML) {
auto& input_edge_list =
*std::dynamic_pointer_cast<SDMLEdgeColumn>(column);
VOpt opt = params.opt;
if (params.opt == VOpt::kOther) {
if (input_edge_list.dir() == Direction::kOut) {
opt = VOpt::kEnd;
} else {
opt = VOpt::kStart;
}
}
auto labels =
extract_labels(input_edge_list.get_labels(), params.tables, opt);
if (labels.size() == 0) {
auto builder = MLVertexColumnBuilder::builder();
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr), {});
return ctx;
}
if (labels.size() > 1) {
auto builder = MLVertexColumnBuilder::builder();
if (opt == VOpt::kStart) {
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (std::find(labels.begin(), labels.end(), label.src_label) !=
labels.end()) {
builder.push_back_vertex({label.src_label, src});
shuffle_offset.push_back(index);
}
});
} else if (opt == VOpt::kEnd) {
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (std::find(labels.begin(), labels.end(), label.dst_label) !=
labels.end()) {
builder.push_back_vertex({label.dst_label, dst});
shuffle_offset.push_back(index);
}
});
}
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
}
} else if (column->edge_column_type() == EdgeColumnType::kBDSL) {
auto& input_edge_list =
*std::dynamic_pointer_cast<BDSLEdgeColumn>(column);
if (params.tables.size() == 0) {
auto type = input_edge_list.get_labels()[0];
if (type.src_label != type.dst_label) {
auto builder = MLVertexColumnBuilder::builder();
if (params.opt != VOpt::kOther) {
LOG(ERROR) << "not support GetV opt "
<< static_cast<int>(params.opt);
RETURN_UNSUPPORTED_ERROR(
"not support GetV opt " +
std::to_string(static_cast<int>(params.opt)));
}
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (dir == Direction::kOut) {
builder.push_back_vertex({label.dst_label, dst});
} else {
builder.push_back_vertex({label.src_label, src});
}
shuffle_offset.push_back(index);
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else {
auto builder = SLVertexColumnBuilder::builder(type.src_label);
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (dir == Direction::kOut) {
builder.push_back_opt(dst);
shuffle_offset.push_back(index);
} else {
builder.push_back_opt(src);
shuffle_offset.push_back(index);
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
}
} else {
std::vector<label_t> labels;
auto type = input_edge_list.get_labels()[0];
for (auto& label : params.tables) {
if (label == type.src_label || label == type.dst_label) {
labels.push_back(label);
}
}
if (labels.size() == 1) {
auto builder = SLVertexColumnBuilder::builder(labels[0]);
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (dir == Direction::kOut) {
if (label.dst_label == labels[0]) {
builder.push_back_opt(dst);
shuffle_offset.push_back(index);
}
} else {
if (label.src_label == labels[0]) {
builder.push_back_opt(src);
shuffle_offset.push_back(index);
}
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else {
auto builder = MLVertexColumnBuilder::builder();
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (dir == Direction::kOut) {
if (std::find(labels.begin(), labels.end(),
label.dst_label) != labels.end()) {
builder.push_back_vertex({label.dst_label, dst});
shuffle_offset.push_back(index);
}
} else {
if (std::find(labels.begin(), labels.end(),
label.src_label) != labels.end()) {
builder.push_back_vertex({label.src_label, src});
shuffle_offset.push_back(index);
}
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
}
}
} else if (column->edge_column_type() == EdgeColumnType::kBDML) {
auto& input_edge_list =
*std::dynamic_pointer_cast<BDMLEdgeColumn>(column);
if (params.tables.size() == 0) {
auto builder = MLVertexColumnBuilder::builder();
if (params.opt != VOpt::kOther) {
LOG(ERROR) << "not support GetV opt " << static_cast<int>(params.opt);
RETURN_UNSUPPORTED_ERROR(
"not support GetV opt " +
std::to_string(static_cast<int>(params.opt)));
}
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (dir == Direction::kOut) {
builder.push_back_vertex({label.dst_label, dst});
} else {
builder.push_back_vertex({label.src_label, src});
}
shuffle_offset.push_back(index);
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
return ctx;
} else {
if (params.tables.size() == 1) {
auto vlabel = params.tables[0];
auto builder = SLVertexColumnBuilder::builder(vlabel);
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (dir == Direction::kOut) {
if (label.dst_label == vlabel) {
builder.push_back_opt(dst);
shuffle_offset.push_back(index);
}
} else {
if (label.src_label == vlabel) {
builder.push_back_opt(src);
shuffle_offset.push_back(index);
}
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
} else {
std::vector<bool> labels(graph.schema().vertex_label_num(), false);
for (auto& label : params.tables) {
labels[label] = true;
}
auto builder = MLVertexColumnBuilder::builder();
input_edge_list.foreach_edge(
[&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst,
const EdgeData& edata, Direction dir) {
if (dir == Direction::kOut) {
if (labels[label.dst_label]) {
builder.push_back_vertex({label.dst_label, dst});
shuffle_offset.push_back(index);
}
} else {
if (labels[label.src_label]) {
builder.push_back_vertex({label.src_label, src});
shuffle_offset.push_back(index);
}
}
});
ctx.set_with_reshuffle(params.alias, builder.finish(nullptr),
shuffle_offset);
}
return ctx;
}
}
LOG(ERROR) << "Unsupported edge column type: "
<< static_cast<int>(column->edge_column_type());
RETURN_UNSUPPORTED_ERROR(
"Unsupported edge column type: " +
std::to_string(static_cast<int>(column->edge_column_type())));
}