fn consumer()

in src/dumper.rs [388:453]


fn consumer(
    arch: Arch,
    sender: Sender<Option<JobItem>>,
    receiver: Receiver<Option<JobItem>>,
    results: Arc<Mutex<HashMap<String, ObjectInfo>>>,
    counter: Arc<AtomicUsize>,
    num_threads: usize,
    output: Output,
    check_cfi: bool,
) -> common::Result<()> {
    while let Ok(job) = receiver.recv() {
        if job.is_none() {
            return Ok(());
        }

        let JobItem {
            file,
            typ,
            mapping,
            collect_inlines,
        } = job.unwrap();

        match typ {
            JobType::Get => {
                let path = PathBuf::from(file);
                let filename = utils::get_filename(&path);
                let buf = utils::read_file(&path);

                let info =
                    get_object_info(buf, &path, &filename, mapping, arch, None, collect_inlines)?;

                let mut results = results.lock().unwrap();
                let info = if let Some(prev) = results.remove(info.get_debug_id()) {
                    ObjectInfo::merge(info, prev).inspect_err(|_e| {
                        poison_queue(&sender, num_threads);
                    })?
                } else {
                    info
                };
                results.insert(info.get_debug_id().to_string(), info);
            }
            JobType::Dump(d) => {
                self::store(&output, check_cfi, d)?;
                continue;
            }
        }

        if counter.load(Ordering::SeqCst) == 1 {
            // it was the last file: so we just have to add jobs to dump & store
            // and then poison the queue
            let mut results = results.lock().unwrap();
            send_store_jobs(
                &sender,
                &mut results,
                num_threads,
                output.clone(),
                check_cfi,
                collect_inlines,
            )?;
        } else {
            counter.fetch_sub(1, Ordering::SeqCst);
        }
    }

    Ok(())
}