in interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/exchange.rs [293:523]
fn push(&mut self, mut batch: MicroBatch<D>) -> Result<(), IOError> {
if !self.blocks.is_empty() {
if let Some(b) = self.blocks.get_mut(&batch.tag) {
warn_worker!(
"output[{:?}] block pushing batch of {:?} to channel[{}] ;",
self.port,
batch.tag,
self.index
);
b.push_back(BlockEntry::Batch(batch));
return would_block!("no buffer available in exchange;");
}
}
let level = batch.tag.len() as u32;
if self.pushes.len() == 1 {
if level == self.scope_level && self.cancel_handle.is_canceled(&batch.tag, 0) {
batch.clear();
if !batch.is_last() {
return Ok(());
}
}
return self.pushes[0].push(batch);
}
let len = batch.len();
if len == 0 {
if let Some(end) = batch.take_end() {
if level == self.scope_level {
if end.peers().value() == 0 {
// TODO: seems unreachable;
if batch.get_seq() != 0 {
let mut err = IOError::new(IOErrorKind::Internal);
err.set_io_cause(std::io::Error::new(
std::io::ErrorKind::Other,
"Batch seq is not 0",
));
return Err(err);
}
// handle empty stream
let mut owner = 0;
if level > 0 {
owner = self
.route
.magic
.exec(end.tag.current_uncheck() as u64)
as u32;
};
if self.src == owner {
for p in self.pushes.iter_mut() {
let mut new_end = end.clone();
new_end.total_send = 0;
new_end.global_total_send = 0;
new_end.update_peers(DynPeers::single(self.src), self.total_peers);
p.push_end(new_end, DynPeers::single(self.src))?;
}
} else {
trace_worker!(
"output[{:?}]: ignore end of scope {:?} as peers = {:?}",
self.port,
end.tag,
end.peers()
);
}
return Ok(());
}
if batch.get_seq() == 0 {
// it's the first batch need to be pushed on this port, and it's an empty batch;
if !end.peers_contains(self.src) {
trace_worker!(
"output[{:?}]: ignore end of scope {:?} as peers = {:?}",
self.port,
end.tag,
end.peers()
);
return Ok(());
}
for p in self.pushes.iter_mut() {
let mut new_end = end.clone();
new_end.total_send = 0;
new_end.global_total_send = 0;
if end.tag.is_root() {
p.sync_end(new_end, DynPeers::all(self.total_peers))?;
} else {
p.sync_end(new_end, DynPeers::empty())?;
}
}
} else {
// not the first batch;
if !end.peers_contains(self.src) {
let mut err = IOError::new(IOErrorKind::Internal);
err.set_io_cause(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"Push illegal data without permission, peers: {:?}, src: {};",
end.peers(),
self.src
),
));
return Err(err);
}
self.flush_last_buffer(batch.tag())?;
for (i, (t, g, children)) in self.update_end(None, &end).enumerate() {
let mut new_end = end.clone();
new_end.total_send = t;
new_end.global_total_send = g;
self.pushes[i].sync_end(new_end, children)?;
}
};
} else {
// this is an end of parent scope;
for p in self.pushes.iter_mut() {
let mut new_end = end.clone();
new_end.total_send = 0;
new_end.global_total_send = 0;
p.sync_end(new_end, DynPeers::all(self.total_peers))?;
}
}
} else {
warn_worker!("output[{:?}]: ignore empty batch of {:?};", self.port, batch.tag);
}
Ok(())
} else if len == 1 {
// only one data, not need re-batching;
if level != self.scope_level {
let mut err = IOError::new(IOErrorKind::Internal);
err.set_io_cause(std::io::Error::new(
std::io::ErrorKind::Other,
format!("scope_level in batch is not equal to that in channel, scope_level in batch: {}, scope_level in channel: {}", level, self.scope_level)
));
return Err(err);
}
if let Some(end) = batch.take_end() {
if !end.peers_contains(self.src) {
let mut err = IOError::new(IOErrorKind::Internal);
err.set_io_cause(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"Push illegal data without permission, peers: {:?}, src: {};",
end.peers(),
self.src
),
));
return Err(err);
}
let x = batch
.get(0)
.expect("expect at least one entry as len = 1");
let target = self.route.route(x)? as usize;
if batch.get_seq() == 0 {
// multi source;
self.pushes[target].push(batch)?;
let children = DynPeers::all(self.total_peers);
for i in 0..self.pushes.len() {
let mut new_end = end.clone();
if i != target {
new_end.total_send = 0;
} else {
new_end.total_send = 1;
}
new_end.global_total_send = 1;
self.pushes[i].sync_end(end.clone(), children.clone())?;
}
} else {
// flush previous buffered data;
self.flush_last_buffer(&batch.tag)?;
let result = self.update_end(Some(target), &end);
if end.peers().value() == 1 {
for (i, (t, g, children)) in result.enumerate() {
let mut new_end = end.clone();
new_end.total_send = t;
new_end.global_total_send = g;
if i == target {
let mut batch = std::mem::replace(&mut batch, MicroBatch::empty());
new_end.update_peers(children, self.total_peers);
batch.set_end(new_end);
self.pushes[i].push(batch)?;
} else {
self.pushes[i].push_end(new_end, children)?;
}
}
} else {
self.pushes[target].push(batch)?;
for (i, (t, g, children)) in result.into_iter().enumerate() {
let mut new_end = end.clone();
new_end.total_send = t;
new_end.global_total_send = g;
self.pushes[i].sync_end(new_end, children)?;
}
}
}
} else {
let item = batch
.next()
.expect("expect at least one entry as len = 1");
let target = self.route.route(&item)? as usize;
match self.buffers[target].push(&batch.tag, item) {
Ok(Some(buf)) => self.push_to(target, batch.tag().clone(), buf)?,
Ok(None) => (),
Err(e) => {
if let Some(x) = e.0 {
self.blocks
.get_mut_or_insert(&batch.tag)
.push_back(BlockEntry::Single((target, x)));
trace_worker!(
"output[{:?}] blocked when push data of {:?} ;",
self.port,
batch.tag,
);
}
would_block!("no buffer available in exchange;")?;
}
}
}
Ok(())
} else {
if level != self.scope_level {
let mut err = IOError::new(IOErrorKind::Internal);
err.set_io_cause(std::io::Error::new(
std::io::ErrorKind::Other,
format!("scope_level in batch is not equal to that in channel, scope_level in batch: {}, scope_level in channel: {}", level, self.scope_level)
));
return Err(err);
}
self.push_inner(batch)
}
}