in interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/count.rs [8:167]
fn count(self) -> Result<SingleItem<u64>, BuildJobError> {
let worker_id = self.get_worker_id().index;
let total_peers = self.get_worker_id().total_peers();
if self.get_partitions() > 1 {
let mut stream = self.unary("count_local", |info| {
let mut table = TidyTagMap::<u64>::new(info.scope_level);
move |input, output| {
input.for_each_batch(|batch| {
if !batch.is_empty() {
let mut cnt = table.remove(&batch.tag).unwrap_or(0);
cnt += batch.len() as u64;
batch.clear();
if let Some(end) = batch.take_end() {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("local count {} of {:?}", cnt, batch.tag);
if end.tag.len() > 0 {
let mut new_end = end.clone();
let mut new_peers = end.peers().clone();
let owner_index = batch.tag.current_uncheck() % total_peers;
new_peers.add_source(owner_index);
new_end.update_peers(new_peers, total_peers);
session.give_last(cnt, new_end)?;
} else {
session.give_last(cnt, end)?;
}
} else {
table.insert(batch.tag.clone(), cnt);
}
return Ok(());
}
if let Some(end) = batch.take_end() {
if let Some(cnt) = table.remove(&batch.tag) {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("local count {} of {:?}", cnt, batch.tag);
if end.tag.len() > 0 {
let mut new_end = end.clone();
let mut new_peers = end.peers().clone();
let owner_index = batch.tag.current_uncheck() % total_peers;
new_peers.add_source(owner_index);
new_end.update_peers(new_peers, total_peers);
session.give_last(cnt, new_end)?;
} else {
session.give_last(cnt, end)?;
}
} else {
let worker = worker_id;
let new_end = if end.tag.len() > 0 {
let mut new_end = end.clone();
let mut new_peers = end.peers().clone();
let owner_index = batch.tag.current_uncheck() % total_peers;
new_peers.add_source(owner_index);
new_end.update_peers(new_peers, total_peers);
new_end
} else {
end
};
if new_end.contains_source(worker, total_peers) {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("local count {} of {:?}", 0, batch.tag);
session.give_last(0, new_end)?;
} else {
output.notify_end(new_end)?;
}
}
}
Ok(())
})
}
})?;
stream
.set_upstream_batch_size(1)
.set_upstream_batch_capacity(1);
let x = stream
.aggregate()
.unary("count_global", |info| {
let mut table = TidyTagMap::<u64>::new(info.scope_level);
move |input, output| {
input.for_each_batch(|batch| {
if !batch.is_empty() {
let mut sum = table.remove(&batch.tag).unwrap_or(0);
for d in batch.drain() {
sum += d;
}
if let Some(end) = batch.take_end() {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("emit global count = {} of {:?};", sum, end.tag);
session.give_last(Single(sum), end)?;
} else {
table.insert(batch.tag.clone(), sum);
}
return Ok(());
}
if let Some(end) = batch.take_end() {
if let Some(sum) = table.remove(&end.tag) {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("emit global count = {} of {:?};", sum, end.tag);
session.give_last(Single(sum), end)?;
} else {
let index = worker_id;
if end.contains_source(index, total_peers) {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("emit global count = {} of {:?};", 0, end.tag);
session.give_last(Single(0), end)?;
} else {
output.notify_end(end)?;
}
}
}
Ok(())
})
}
})?;
Ok(SingleItem::new(x))
} else {
let stream = self.unary("count_global", |info| {
let mut table = TidyTagMap::<u64>::new(info.scope_level);
move |input, output| {
input.for_each_batch(|batch| {
if !batch.is_empty() {
let mut cnt = table.remove(&batch.tag).unwrap_or(0);
cnt += batch.len() as u64;
batch.clear();
if let Some(end) = batch.take_end() {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("global count {} of {:?}", cnt, batch.tag);
session.give_last(Single(cnt), end)?;
} else {
table.insert(batch.tag.clone(), cnt);
}
return Ok(());
}
if let Some(end) = batch.take_end() {
if let Some(cnt) = table.remove(&batch.tag) {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("global count {} of {:?}", cnt, batch.tag);
session.give_last(Single(cnt), end)?;
} else {
let worker = worker_id;
if end.contains_source(worker, total_peers) {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("global count {} of {:?}", 0, batch.tag);
session.give_last(Single(0), end)?;
} else {
output.notify_end(end)?;
}
}
}
Ok(())
})
}
})?;
Ok(SingleItem::new(stream))
}
}