in interactive_engine/executor/store/bmcsr/src/bmcsr.rs [613:791]
fn insert_edges_with_prop(
&mut self, vertex_num: usize, edges: &Vec<(I, I)>, edges_prop: &ColTable, reverse: bool, p: u32,
old_table: ColTable,
) -> ColTable {
let mut new_degree = vec![0; vertex_num];
if reverse {
for e in edges.iter() {
new_degree[e.1.index()] += 1;
}
} else {
for e in edges.iter() {
new_degree[e.0.index()] += 1;
}
}
let mut new_table = old_table.new_empty();
new_table.resize(self.edge_num + edges.len());
let num_threads = p as usize;
let old_vertex_num = self.offsets.len();
let chunk_size = (((vertex_num + num_threads - 1) / num_threads) + 3) / 4;
let chunk_num = (vertex_num + chunk_size - 1) / chunk_size;
let mut chunk_offset = vec![0_usize; chunk_num];
let safe_chunk_offset_ptr = SafeMutPtr::new(&mut chunk_offset);
let mut new_offsets = ArrayType::with_capacity(vertex_num);
new_offsets.resize(vertex_num, 0);
let safe_new_offsets_ptr = SafeMutPtr::new(&mut new_offsets);
let safe_new_degree_ptr = SafeMutPtr::new(&mut new_degree);
let safe_degree_ptr = SafePtr::new(&self.degree);
let chunk_i = AtomicUsize::new(0);
rayon::scope(|s| {
for _ in 0..num_threads {
s.spawn(|_| {
let chunk_i_ref = &chunk_i;
let new_offsets_ref = safe_new_offsets_ptr.get_mut();
let chunk_offset_ref = safe_chunk_offset_ptr.get_mut();
let degree_ref = safe_degree_ptr.get_ref();
let new_degree_ref = safe_new_degree_ptr.get_mut();
loop {
let cur_chunk = chunk_i_ref.fetch_add(1, Ordering::Relaxed);
if cur_chunk >= chunk_num {
break;
}
let mut local_offset = 0_usize;
let start_idx = cur_chunk * chunk_size;
let end_idx = vertex_num.min(start_idx + chunk_size);
if end_idx > old_vertex_num {
if start_idx >= old_vertex_num {
for v in start_idx..end_idx {
new_offsets_ref[v] = local_offset;
local_offset += (new_degree[v]) as usize;
}
} else {
for v in start_idx..old_vertex_num {
new_offsets_ref[v] = local_offset;
local_offset += (degree_ref[v] + new_degree_ref[v]) as usize;
}
for v in old_vertex_num..end_idx {
new_offsets_ref[v] = local_offset;
local_offset += (new_degree[v]) as usize;
}
}
} else {
for v in start_idx..end_idx {
new_offsets_ref[v] = local_offset;
local_offset += (degree_ref[v] + new_degree_ref[v]) as usize;
}
}
chunk_offset_ref[cur_chunk] = local_offset;
}
});
}
});
let mut cur_offset = 0_usize;
for i in 0..chunk_num {
let tmp = chunk_offset[i] + cur_offset;
chunk_offset[i] = cur_offset;
cur_offset = tmp;
}
let mut new_neighbors = ArrayType::with_capacity(cur_offset);
new_neighbors.resize(cur_offset, I::new(0));
let safe_new_neighbors_ptr = SafeMutPtr::new(&mut new_neighbors);
let safe_neighbors_ptr = SafePtr::new(&self.neighbors);
let safe_offsets_ptr = SafePtr::new(&self.offsets);
let safe_new_table_ptr = SafeMutPtr::new(&mut new_table);
let safe_old_table_ptr = SafePtr::new(&old_table);
let chunk_i = AtomicUsize::new(0);
rayon::scope(|s| {
for _ in 0..num_threads {
s.spawn(|_| {
let chunk_i_ref = &chunk_i;
let neighbors_ref = safe_neighbors_ptr.get_ref();
let new_neighbors_ref = safe_new_neighbors_ptr.get_mut();
let new_offsets_ref = safe_new_offsets_ptr.get_mut();
let chunk_offset_ref = safe_chunk_offset_ptr.get_mut();
let offsets_ref = safe_offsets_ptr.get_ref();
let degree_ref = safe_degree_ptr.get_ref();
let new_table_ref = safe_new_table_ptr.get_mut();
let old_table_ref = safe_old_table_ptr.get_ref();
loop {
let cur_chunk = chunk_i_ref.fetch_add(1, Ordering::Relaxed);
if cur_chunk >= chunk_num {
break;
}
let local_offset = chunk_offset_ref[cur_chunk];
let start_idx = cur_chunk * chunk_size;
let end_idx = vertex_num.min(start_idx + chunk_size);
if end_idx > old_vertex_num {
if start_idx >= old_vertex_num {
for v in start_idx..end_idx {
new_offsets_ref[v] += local_offset;
}
} else {
for v in start_idx..old_vertex_num {
let offset = new_offsets_ref[v] + local_offset;
new_offsets_ref[v] = offset;
let old_offset = offsets_ref[v];
let deg = degree_ref[v] as usize;
new_neighbors_ref[offset..offset + deg]
.copy_from_slice(&neighbors_ref[old_offset..old_offset + deg]);
new_table_ref.copy_range(offset, old_table_ref, old_offset, deg);
}
for v in old_vertex_num..end_idx {
new_offsets_ref[v] += local_offset;
}
}
} else {
for v in start_idx..end_idx {
let offset = new_offsets_ref[v] + local_offset;
new_offsets_ref[v] = offset;
let old_offset = offsets_ref[v];
let deg = degree_ref[v] as usize;
new_neighbors_ref[offset..offset + deg]
.copy_from_slice(&neighbors_ref[old_offset..old_offset + deg]);
new_table_ref.copy_range(offset, old_table_ref, old_offset, deg);
}
}
}
});
}
});
self.degree.resize(vertex_num, 0);
let new_degree = &mut self.degree;
if reverse {
for (row_i, (src, dst)) in edges.iter().enumerate() {
let offset = new_offsets[dst.index()] + new_degree[dst.index()] as usize;
new_degree[dst.index()] += 1;
new_neighbors[offset] = *src;
new_table.set_table_row(offset, edges_prop, row_i);
}
} else {
for (row_i, (src, dst)) in edges.iter().enumerate() {
let offset = new_offsets[src.index()] + new_degree[src.index()] as usize;
new_degree[src.index()] += 1;
new_neighbors[offset] = *dst;
new_table.set_table_row(offset, edges_prop, row_i);
}
}
self.neighbors = new_neighbors;
self.offsets = new_offsets;
self.edge_num = cur_offset;
new_table
}