in src/local.rs [626:709]
fn list_with_maybe_offset(
&self,
prefix: Option<&Path>,
maybe_offset: Option<&Path>,
) -> BoxStream<'static, Result<ObjectMeta>> {
let config = Arc::clone(&self.config);
let root_path = match prefix {
Some(prefix) => match config.prefix_to_filesystem(prefix) {
Ok(path) => path,
Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(),
},
None => config.root.to_file_path().unwrap(),
};
let walkdir = WalkDir::new(root_path)
// Don't include the root directory itself
.min_depth(1)
.follow_links(true);
let maybe_offset = maybe_offset.cloned();
let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
// Apply offset filter before proceeding, to reduce statx file system calls
// This matters for NFS mounts
if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(), result_dir_entry.as_ref()) {
let location = config.filesystem_to_path(entry.path());
match location {
Ok(path) if path <= *offset => return None,
Err(e) => return Some(Err(e)),
_ => {}
}
}
let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
Ok(entry) => entry,
Err(e) => return Some(Err(e)),
};
if !entry.path().is_file() {
return None;
}
match config.filesystem_to_path(entry.path()) {
Ok(path) => match is_valid_file_path(&path) {
true => convert_entry(entry, path).transpose(),
false => None,
},
Err(e) => Some(Err(e)),
}
});
// If no tokio context, return iterator directly as no
// need to perform chunked spawn_blocking reads
if tokio::runtime::Handle::try_current().is_err() {
return futures::stream::iter(s).boxed();
}
// Otherwise list in batches of CHUNK_SIZE
const CHUNK_SIZE: usize = 1024;
let buffer = VecDeque::with_capacity(CHUNK_SIZE);
futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
if buffer.is_empty() {
(s, buffer) = tokio::task::spawn_blocking(move || {
for _ in 0..CHUNK_SIZE {
match s.next() {
Some(r) => buffer.push_back(r),
None => break,
}
}
(s, buffer)
})
.await?;
}
match buffer.pop_front() {
Some(Err(e)) => Err(e),
Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
None => Ok(None),
}
})
.boxed()
}