in merkledb/src/merkledb_ingestion_v1.rs [39:104]
fn ingest_dir(
&mut self,
input: PathBuf,
after_file_date: Option<SystemTime>,
metadata_output_file: &mut BufWriter<File>,
) -> Option<MerkleNode> {
let now = Instant::now();
debug!("Ingesting dir {:?}", input);
let mut staging = self.start_insertion_staging();
// Walkdir usage copied from https://docs.rs/walkdir/2.3.2/walkdir/
// TODO: magic constant here. Probably change to something like k * nCPUs
let (tx, rx) = sync_channel::<(Vec<ChunkInfo>, PathBuf)>(64);
thread::spawn(move || {
WalkDir::new(&input)
.follow_links(false) // do not follow symlinks
.into_iter()
.filter_entry(|e| !e.path_is_symlink()) // do not read symlinks
.filter_map(|e| e.ok())
.par_bridge() // we use par_bridge() instead of
// collecting then par_iter since when
// there are millions of files, collecting
// will be terrifyingly memory intensive.
.filter(|p| p.file_type().is_file())
.filter(|p| {
if let Some(after) = after_file_date {
if let Ok(Ok(modified)) = p.metadata().map(|m| m.modified()) {
modified >= after
} else {
true
}
} else {
true
}
})
.map(|entry| {
let path = entry.into_path();
let input_file = match File::open(&path) {
Err(why) => panic!("Cannot open {input:?}: {why}"),
Ok(file) => file,
};
let mut buf_reader = BufReader::new(input_file);
let chunks =
low_variance_chunk_target(&mut buf_reader, TARGET_CDC_CHUNK_SIZE, N_LOW_VARIANCE_CDC_CHUNKERS);
(chunks, path)
})
.for_each(|x| tx.send(x).unwrap());
});
while let Ok((chunks, path)) = rx.recv() {
if !chunks.is_empty() {
let hash = self.add_file(&mut staging, &chunks);
writeln!(metadata_output_file, "{hash:x} {path:?}").unwrap();
} else {
writeln!(metadata_output_file, "{:x} {:?}", MerkleHash::default(), path).unwrap();
}
}
let dirroot = self.finalize(staging);
self.flush().unwrap();
let chunk_time = now.elapsed().as_secs_f64();
info!("Completed chunking in {:.2?}", now.elapsed());
let total_length = dirroot.len() as f64;
info!("Chunking speed: {} MB/s", total_length / 1024.0 / 1024.0 / chunk_time);
Some(dirroot)
}