fn on_receive()

in interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/switch.rs [71:235]


    fn on_receive(
        &mut self, inputs: &[Box<dyn InputProxy>], outputs: &[Box<dyn OutputProxy>],
    ) -> Result<(), JobExecError> {
        let leave = new_output::<D>(&outputs[0]);
        let enter = new_output::<D>(&outputs[1]);
        let mut main = new_input_session::<D>(&inputs[0]);
        main.for_each_batch(|batch| {
            if !batch.is_empty() {
                if self.cond.has_until_cond() {
                    let mut leave = leave.new_session(&batch.tag)?;
                    let mut enter = enter.new_session(&batch.tag)?;
                    for data in batch.drain() {
                        if self.cond.is_converge(&data)? {
                            leave.give(data)?;
                        } else {
                            if self.emit_kind == Some(EmitKind::Before) {
                                match enter.give(data.clone()) {
                                    Err(e) => {
                                        if e.is_would_block() {
                                            leave.give(data)?;
                                        }
                                        Err(e)
                                    }
                                    Ok(()) => leave.give(data),
                                }?;
                            } else {
                                enter.give(data)?;
                            }
                        }
                    }
                } else {
                    let mut re = std::mem::replace(batch, MicroBatch::empty());
                    batch.set_tag(re.tag.clone());
                    batch.set_seq(re.get_seq());
                    if let Some(e) = re.take_end() {
                        batch.set_end(e);
                    }
                    if self.emit_kind == Some(EmitKind::Before) {
                        match enter.push_batch(re.share()) {
                            Err(e) => {
                                if e.is_would_block() {
                                    leave.push_batch(re)?;
                                }
                                Err(e)
                            }
                            Ok(()) => leave.push_batch(re),
                        }?;
                    } else {
                        enter.push_batch(re)?;
                    }
                }
            }

            if let Some(end) = batch.take_end() {
                let p = batch.tag.to_parent_uncheck();
                trace_worker!("detect scope {:?} in iteration;", batch.tag);
                self.iterate_states
                    .insert(p, IterateState::new());
                enter.notify_end(end)?;
            }

            Ok(())
        })?;

        let mut feedback_sync = new_input_session::<D>(&inputs[1]);
        feedback_sync.for_each_batch(|batch| {
            self.has_synchronized = true;
            if batch.tag.current_uncheck() >= self.cond.max_iters {
                let end = batch.take_end();
                if !batch.is_empty() {
                    leave.push_batch_mut(batch)?;
                }
                if let Some(_) = end {
                    trace_worker!("detect {:?} leave iteration;", batch.tag);
                    let p = batch.tag.to_parent_uncheck();
                    if let Some(mut state) = self.iterate_states.remove(&p) {
                        state.leave_iteration();
                        if let Some(end) = state.take_end() {
                            if !end.tag.is_root() {
                                leave.notify_end(end.clone())?;
                            }
                            enter.notify_end(end)?;
                        } else {
                            warn_worker!("{:?} not end while {:?} leave iteration;", p, batch.tag);
                            self.iterate_states.insert(p, state);
                        }
                    } else {
                        error_worker!("iteration for {:?} not found;", p);
                        panic!("iteration for {:?} not found", p);
                    }

                    if self.iterate_states.is_empty() {
                        trace_worker!("detect no scope in iteration;");
                        let len = self.parent_parent_scope_ends.len();
                        for i in (0..len).rev() {
                            for end in self.parent_parent_scope_ends[i].drain(..) {
                                if !end.tag.is_root() {
                                    outputs[0].notify_end(end.clone())?;
                                }
                                outputs[1].notify_end(end)?;
                            }
                        }
                    } else {
                    }
                }
            } else {
                if self.cond.has_until_cond() {
                    let mut leave = leave.new_session(&batch.tag)?;
                    let mut enter = enter.new_session(&batch.tag)?;
                    for data in batch.drain() {
                        if self.cond.is_converge(&data)? {
                            leave.give(data)?;
                        } else {
                            if self.emit_kind.is_some() {
                                match enter.give(data.clone()) {
                                    Err(e) => {
                                        if e.is_would_block() {
                                            leave.give(data)?;
                                        }
                                        Err(e)
                                    }
                                    Ok(()) => leave.give(data),
                                }?;
                            } else {
                                enter.give(data)?;
                            }
                        }
                    }
                } else {
                    let mut re = std::mem::replace(batch, MicroBatch::empty());
                    batch.set_tag(re.tag.clone());
                    batch.set_seq(re.get_seq());
                    if let Some(end) = re.take_end() {
                        batch.set_end(end);
                    }

                    if self.emit_kind.is_some() {
                        match enter.push_batch(re.share()) {
                            Err(e) => {
                                if e.is_would_block() {
                                    leave.push_batch(re)?;
                                }
                                Err(e)
                            }
                            Ok(()) => leave.push_batch(re),
                        }?;
                    } else {
                        enter.push_batch(re)?;
                    }
                }

                if let Some(end) = batch.take_end() {
                    let p = batch.tag.to_parent_uncheck();
                    if !self.iterate_states.contains_key(&p) {
                        trace_worker!("detect scope {:?} in iteration;", batch.tag);
                        self.iterate_states
                            .insert(p, IterateState::new());
                    }
                    enter.notify_end(end)?;
                }
            }

            Ok(())
        })
    }