in interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/switch.rs [71:235]
fn on_receive(
&mut self, inputs: &[Box<dyn InputProxy>], outputs: &[Box<dyn OutputProxy>],
) -> Result<(), JobExecError> {
let leave = new_output::<D>(&outputs[0]);
let enter = new_output::<D>(&outputs[1]);
let mut main = new_input_session::<D>(&inputs[0]);
main.for_each_batch(|batch| {
if !batch.is_empty() {
if self.cond.has_until_cond() {
let mut leave = leave.new_session(&batch.tag)?;
let mut enter = enter.new_session(&batch.tag)?;
for data in batch.drain() {
if self.cond.is_converge(&data)? {
leave.give(data)?;
} else {
if self.emit_kind == Some(EmitKind::Before) {
match enter.give(data.clone()) {
Err(e) => {
if e.is_would_block() {
leave.give(data)?;
}
Err(e)
}
Ok(()) => leave.give(data),
}?;
} else {
enter.give(data)?;
}
}
}
} else {
let mut re = std::mem::replace(batch, MicroBatch::empty());
batch.set_tag(re.tag.clone());
batch.set_seq(re.get_seq());
if let Some(e) = re.take_end() {
batch.set_end(e);
}
if self.emit_kind == Some(EmitKind::Before) {
match enter.push_batch(re.share()) {
Err(e) => {
if e.is_would_block() {
leave.push_batch(re)?;
}
Err(e)
}
Ok(()) => leave.push_batch(re),
}?;
} else {
enter.push_batch(re)?;
}
}
}
if let Some(end) = batch.take_end() {
let p = batch.tag.to_parent_uncheck();
trace_worker!("detect scope {:?} in iteration;", batch.tag);
self.iterate_states
.insert(p, IterateState::new());
enter.notify_end(end)?;
}
Ok(())
})?;
let mut feedback_sync = new_input_session::<D>(&inputs[1]);
feedback_sync.for_each_batch(|batch| {
self.has_synchronized = true;
if batch.tag.current_uncheck() >= self.cond.max_iters {
let end = batch.take_end();
if !batch.is_empty() {
leave.push_batch_mut(batch)?;
}
if let Some(_) = end {
trace_worker!("detect {:?} leave iteration;", batch.tag);
let p = batch.tag.to_parent_uncheck();
if let Some(mut state) = self.iterate_states.remove(&p) {
state.leave_iteration();
if let Some(end) = state.take_end() {
if !end.tag.is_root() {
leave.notify_end(end.clone())?;
}
enter.notify_end(end)?;
} else {
warn_worker!("{:?} not end while {:?} leave iteration;", p, batch.tag);
self.iterate_states.insert(p, state);
}
} else {
error_worker!("iteration for {:?} not found;", p);
panic!("iteration for {:?} not found", p);
}
if self.iterate_states.is_empty() {
trace_worker!("detect no scope in iteration;");
let len = self.parent_parent_scope_ends.len();
for i in (0..len).rev() {
for end in self.parent_parent_scope_ends[i].drain(..) {
if !end.tag.is_root() {
outputs[0].notify_end(end.clone())?;
}
outputs[1].notify_end(end)?;
}
}
} else {
}
}
} else {
if self.cond.has_until_cond() {
let mut leave = leave.new_session(&batch.tag)?;
let mut enter = enter.new_session(&batch.tag)?;
for data in batch.drain() {
if self.cond.is_converge(&data)? {
leave.give(data)?;
} else {
if self.emit_kind.is_some() {
match enter.give(data.clone()) {
Err(e) => {
if e.is_would_block() {
leave.give(data)?;
}
Err(e)
}
Ok(()) => leave.give(data),
}?;
} else {
enter.give(data)?;
}
}
}
} else {
let mut re = std::mem::replace(batch, MicroBatch::empty());
batch.set_tag(re.tag.clone());
batch.set_seq(re.get_seq());
if let Some(end) = re.take_end() {
batch.set_end(end);
}
if self.emit_kind.is_some() {
match enter.push_batch(re.share()) {
Err(e) => {
if e.is_would_block() {
leave.push_batch(re)?;
}
Err(e)
}
Ok(()) => leave.push_batch(re),
}?;
} else {
enter.push_batch(re)?;
}
}
if let Some(end) = batch.take_end() {
let p = batch.tag.to_parent_uncheck();
if !self.iterate_states.contains_key(&p) {
trace_worker!("detect scope {:?} in iteration;", batch.tag);
self.iterate_states
.insert(p, IterateState::new());
}
enter.notify_end(end)?;
}
}
Ok(())
})
}