fn push()

in interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/exchange.rs [293:523]


    fn push(&mut self, mut batch: MicroBatch<D>) -> Result<(), IOError> {
        if !self.blocks.is_empty() {
            if let Some(b) = self.blocks.get_mut(&batch.tag) {
                warn_worker!(
                    "output[{:?}] block pushing batch of {:?} to channel[{}] ;",
                    self.port,
                    batch.tag,
                    self.index
                );
                b.push_back(BlockEntry::Batch(batch));
                return would_block!("no buffer available in exchange;");
            }
        }

        let level = batch.tag.len() as u32;

        if self.pushes.len() == 1 {
            if level == self.scope_level && self.cancel_handle.is_canceled(&batch.tag, 0) {
                batch.clear();
                if !batch.is_last() {
                    return Ok(());
                }
            }
            return self.pushes[0].push(batch);
        }

        let len = batch.len();

        if len == 0 {
            if let Some(end) = batch.take_end() {
                if level == self.scope_level {
                    if end.peers().value() == 0 {
                        // TODO: seems unreachable;
                        if batch.get_seq() != 0 {
                            let mut err = IOError::new(IOErrorKind::Internal);
                            err.set_io_cause(std::io::Error::new(
                                std::io::ErrorKind::Other,
                                "Batch seq is not 0",
                            ));
                            return Err(err);
                        }
                        // handle empty stream
                        let mut owner = 0;
                        if level > 0 {
                            owner = self
                                .route
                                .magic
                                .exec(end.tag.current_uncheck() as u64)
                                as u32;
                        };
                        if self.src == owner {
                            for p in self.pushes.iter_mut() {
                                let mut new_end = end.clone();
                                new_end.total_send = 0;
                                new_end.global_total_send = 0;
                                new_end.update_peers(DynPeers::single(self.src), self.total_peers);
                                p.push_end(new_end, DynPeers::single(self.src))?;
                            }
                        } else {
                            trace_worker!(
                                "output[{:?}]: ignore end of scope {:?} as peers = {:?}",
                                self.port,
                                end.tag,
                                end.peers()
                            );
                        }
                        return Ok(());
                    }

                    if batch.get_seq() == 0 {
                        // it's the first batch need to be pushed on this port, and it's an empty batch;
                        if !end.peers_contains(self.src) {
                            trace_worker!(
                                "output[{:?}]: ignore end of scope {:?} as peers = {:?}",
                                self.port,
                                end.tag,
                                end.peers()
                            );
                            return Ok(());
                        }

                        for p in self.pushes.iter_mut() {
                            let mut new_end = end.clone();
                            new_end.total_send = 0;
                            new_end.global_total_send = 0;
                            if end.tag.is_root() {
                                p.sync_end(new_end, DynPeers::all(self.total_peers))?;
                            } else {
                                p.sync_end(new_end, DynPeers::empty())?;
                            }
                        }
                    } else {
                        // not the first batch;
                        if !end.peers_contains(self.src) {
                            let mut err = IOError::new(IOErrorKind::Internal);
                            err.set_io_cause(std::io::Error::new(
                                std::io::ErrorKind::Other,
                                format!(
                                    "Push illegal data without permission, peers: {:?}, src: {};",
                                    end.peers(),
                                    self.src
                                ),
                            ));
                            return Err(err);
                        }
                        self.flush_last_buffer(batch.tag())?;
                        for (i, (t, g, children)) in self.update_end(None, &end).enumerate() {
                            let mut new_end = end.clone();
                            new_end.total_send = t;
                            new_end.global_total_send = g;
                            self.pushes[i].sync_end(new_end, children)?;
                        }
                    };
                } else {
                    // this is an end of parent scope;
                    for p in self.pushes.iter_mut() {
                        let mut new_end = end.clone();
                        new_end.total_send = 0;
                        new_end.global_total_send = 0;
                        p.sync_end(new_end, DynPeers::all(self.total_peers))?;
                    }
                }
            } else {
                warn_worker!("output[{:?}]: ignore empty batch of {:?};", self.port, batch.tag);
            }
            Ok(())
        } else if len == 1 {
            // only one data, not need re-batching;
            if level != self.scope_level {
                let mut err = IOError::new(IOErrorKind::Internal);
                err.set_io_cause(std::io::Error::new(
                    std::io::ErrorKind::Other,
                    format!("scope_level in batch is not equal to that in channel, scope_level in batch: {}, scope_level in channel: {}", level, self.scope_level)
                ));
                return Err(err);
            }
            if let Some(end) = batch.take_end() {
                if !end.peers_contains(self.src) {
                    let mut err = IOError::new(IOErrorKind::Internal);
                    err.set_io_cause(std::io::Error::new(
                        std::io::ErrorKind::Other,
                        format!(
                            "Push illegal data without permission, peers: {:?}, src: {};",
                            end.peers(),
                            self.src
                        ),
                    ));
                    return Err(err);
                }
                let x = batch
                    .get(0)
                    .expect("expect at least one entry as len = 1");
                let target = self.route.route(x)? as usize;
                if batch.get_seq() == 0 {
                    // multi source;
                    self.pushes[target].push(batch)?;
                    let children = DynPeers::all(self.total_peers);
                    for i in 0..self.pushes.len() {
                        let mut new_end = end.clone();
                        if i != target {
                            new_end.total_send = 0;
                        } else {
                            new_end.total_send = 1;
                        }
                        new_end.global_total_send = 1;
                        self.pushes[i].sync_end(end.clone(), children.clone())?;
                    }
                } else {
                    // flush previous buffered data;
                    self.flush_last_buffer(&batch.tag)?;
                    let result = self.update_end(Some(target), &end);
                    if end.peers().value() == 1 {
                        for (i, (t, g, children)) in result.enumerate() {
                            let mut new_end = end.clone();
                            new_end.total_send = t;
                            new_end.global_total_send = g;
                            if i == target {
                                let mut batch = std::mem::replace(&mut batch, MicroBatch::empty());
                                new_end.update_peers(children, self.total_peers);
                                batch.set_end(new_end);
                                self.pushes[i].push(batch)?;
                            } else {
                                self.pushes[i].push_end(new_end, children)?;
                            }
                        }
                    } else {
                        self.pushes[target].push(batch)?;
                        for (i, (t, g, children)) in result.into_iter().enumerate() {
                            let mut new_end = end.clone();
                            new_end.total_send = t;
                            new_end.global_total_send = g;
                            self.pushes[i].sync_end(new_end, children)?;
                        }
                    }
                }
            } else {
                let item = batch
                    .next()
                    .expect("expect at least one entry as len = 1");
                let target = self.route.route(&item)? as usize;
                match self.buffers[target].push(&batch.tag, item) {
                    Ok(Some(buf)) => self.push_to(target, batch.tag().clone(), buf)?,
                    Ok(None) => (),
                    Err(e) => {
                        if let Some(x) = e.0 {
                            self.blocks
                                .get_mut_or_insert(&batch.tag)
                                .push_back(BlockEntry::Single((target, x)));
                            trace_worker!(
                                "output[{:?}] blocked when push data of {:?} ;",
                                self.port,
                                batch.tag,
                            );
                        }
                        would_block!("no buffer available in exchange;")?;
                    }
                }
            }
            Ok(())
        } else {
            if level != self.scope_level {
                let mut err = IOError::new(IOErrorKind::Internal);
                err.set_io_cause(std::io::Error::new(
                    std::io::ErrorKind::Other,
                    format!("scope_level in batch is not equal to that in channel, scope_level in batch: {}, scope_level in channel: {}", level, self.scope_level)
                ));
                return Err(err);
            }
            self.push_inner(batch)
        }
    }