fn get_line_processor()

in src/dachshund/transformer_base.rs [24:91]


    fn get_line_processor(&self) -> Arc<dyn LineProcessorBase>;
    // logic for taking row and storing into self via side-effect
    fn process_row(&mut self, row: Box<dyn Row>) -> CLQResult<()>;
    // logic for processing batch of rows, once all rows are ready
    fn process_batch(
        &mut self,
        graph_id: GraphId,
        output: &Sender<(Option<String>, bool)>,
    ) -> CLQResult<()>;
    // reset transformer state after processing;
    fn reset(&mut self) -> CLQResult<()>;

    // main loop, runs through lines ordered by graph_id, updates state accordingly
    // and runs process_batch when graph_id changes
    fn run(&mut self, input: Input, mut output: Output) -> CLQResult<()> {
        let ret = crossbeam::scope(|scope| {
            let line_processor = self.get_line_processor();
            let num_processed = Arc::new(AtomicUsize::new(0 as usize));
            let (sender, receiver) = channel();
            let num_processed_clone = num_processed.clone();
            let writer = scope.spawn(move |_| loop {
                match receiver.recv() {
                    Ok((line, shutdown)) => {
                        if shutdown {
                            return;
                        }
                        if let Some(string) = line {
                            output.print(string).unwrap();
                        }
                        num_processed_clone.fetch_add(1, Ordering::SeqCst);
                    }
                    Err(error) => panic!("{}", error),
                }
            });
            let mut current_graph_id: Option<GraphId> = None;
            let mut num_to_process: usize = 0;
            for line in input.lines() {
                match line {
                    Ok(n) => {
                        let row: Box<dyn Row> = line_processor.process_line(n)?;
                        let new_graph_id: GraphId = row.get_graph_id();
                        if let Some(some_current_graph_id) = current_graph_id {
                            if new_graph_id != some_current_graph_id {
                                self.process_batch(some_current_graph_id, &sender.clone())?;
                                num_to_process += 1;
                                self.reset()?;
                            }
                        }
                        current_graph_id = Some(new_graph_id);
                        self.process_row(row)?;
                    }
                    Err(error) => eprintln!("I/O error: {}", error),
                }
            }
            if let Some(some_current_graph_id) = current_graph_id {
                self.process_batch(some_current_graph_id, &sender)?;
                num_to_process += 1;
                while num_to_process > num_processed.load(Ordering::SeqCst) {
                    thread::sleep(Duration::from_millis(100));
                }
                sender.send((None, true)).unwrap();
                writer.join().unwrap();
                return Ok(());
            }
            Err("No input rows!".into())
        });
        ret.unwrap()
    }