in interactive_engine/executor/store/mcsr/src/graph_loader.rs [197:329]
fn load_edges<R: Read>(
&mut self, src_vertex_type: LabelId, dst_vertex_type: LabelId, edge_type: LabelId,
is_src_static: bool, is_dst_static: bool, mut rdr: Reader<R>, idegree: &mut Vec<i64>,
odegree: &mut Vec<i64>, parsed_edges: &mut Vec<(I, I, Vec<Item>)>,
) {
info!("loading edge-{}-{}-{}", src_vertex_type, edge_type, dst_vertex_type);
let input_header = self
.input_schema
.get_edge_header(src_vertex_type, edge_type, dst_vertex_type)
.unwrap();
let graph_header = self
.graph_schema
.get_edge_header(src_vertex_type, edge_type, dst_vertex_type)
.unwrap();
let mut keep_set = HashSet::new();
for pair in graph_header {
keep_set.insert(pair.0.clone());
}
let mut selected = vec![false; input_header.len()];
let mut src_col_id = 0;
let mut dst_col_id = 1;
for (index, (name, _)) in input_header.iter().enumerate() {
if keep_set.contains(name) {
selected[index] = true;
}
if name == "start_id" {
src_col_id = index;
} else if name == "end_id" {
dst_col_id = index;
}
}
let src_num = self.vertex_map.vertex_num(src_vertex_type);
let dst_num = self.vertex_map.vertex_num(dst_vertex_type);
let mut parser = LDBCEdgeParser::<G>::new(src_vertex_type, dst_vertex_type, edge_type);
parser.with_endpoint_col_id(src_col_id, dst_col_id);
if is_src_static && is_dst_static {
for result in rdr.records() {
if let Ok(record) = result {
let edge_meta = parser.parse_edge_meta(&record);
if let Ok(properties) = parse_properties(&record, input_header, selected.as_slice()) {
let src_lid = self
.vertex_map
.add_corner_vertex(edge_meta.src_global_id, src_vertex_type);
if src_lid.index() < src_num {
odegree[src_lid.index()] += 1;
}
let dst_lid = self
.vertex_map
.add_corner_vertex(edge_meta.dst_global_id, dst_vertex_type);
if dst_lid.index() < dst_num {
idegree[dst_lid.index()] += 1;
}
parsed_edges.push((src_lid, dst_lid, properties));
}
}
}
} else if is_src_static && !is_dst_static {
for result in rdr.records() {
if let Ok(record) = result {
let edge_meta = parser.parse_edge_meta(&record);
if let Ok(properties) = parse_properties(&record, input_header, selected.as_slice()) {
if keep_vertex(edge_meta.src_global_id, self.peers, self.work_id)
|| keep_vertex(edge_meta.dst_global_id, self.peers, self.work_id)
{
let src_lid = self
.vertex_map
.add_corner_vertex(edge_meta.src_global_id, src_vertex_type);
if src_lid.index() < src_num {
odegree[src_lid.index()] += 1;
}
let dst_lid = self
.vertex_map
.add_corner_vertex(edge_meta.dst_global_id, dst_vertex_type);
if dst_lid.index() < dst_num {
idegree[dst_lid.index()] += 1;
}
parsed_edges.push((src_lid, dst_lid, properties));
}
}
}
}
} else if !is_src_static && is_dst_static {
for result in rdr.records() {
if let Ok(record) = result {
let edge_meta = parser.parse_edge_meta(&record);
if let Ok(properties) = parse_properties(&record, input_header, selected.as_slice()) {
if keep_vertex(edge_meta.src_global_id, self.peers, self.work_id) {
let src_lid = self
.vertex_map
.add_corner_vertex(edge_meta.src_global_id, src_vertex_type);
if src_lid.index() < src_num {
odegree[src_lid.index()] += 1;
}
let dst_lid = self
.vertex_map
.add_corner_vertex(edge_meta.dst_global_id, dst_vertex_type);
if dst_lid.index() < dst_num {
idegree[dst_lid.index()] += 1;
}
parsed_edges.push((src_lid, dst_lid, properties));
}
}
}
}
} else {
for result in rdr.records() {
if let Ok(record) = result {
let edge_meta = parser.parse_edge_meta(&record);
if let Ok(properties) = parse_properties(&record, input_header, selected.as_slice()) {
if keep_vertex(edge_meta.src_global_id, self.peers, self.work_id)
|| keep_vertex(edge_meta.dst_global_id, self.peers, self.work_id)
{
let src_lid = self
.vertex_map
.add_corner_vertex(edge_meta.src_global_id, src_vertex_type);
if src_lid.index() < src_num {
odegree[src_lid.index()] += 1;
}
let dst_lid = self
.vertex_map
.add_corner_vertex(edge_meta.dst_global_id, dst_vertex_type);
if dst_lid.index() < dst_num {
idegree[dst_lid.index()] += 1;
}
parsed_edges.push((src_lid, dst_lid, properties));
}
}
}
}
}
}