in eden/mononoke/manifest/src/ordered_ops.rs [282:567]
fn filtered_diff_ordered<FilterMap, Out, RecursePruner>(
&self,
ctx: CoreContext,
store: Store,
other: Self,
other_store: Store,
after: Option<Option<MPath>>,
output_filter: FilterMap,
recurse_pruner: RecursePruner,
) -> BoxStream<'static, Result<Out, Error>>
where
FilterMap: Fn(
Diff<Entry<Self, <<Self as StoreLoadable<Store>>::Value as Manifest>::LeafId>>,
) -> Option<Out>
+ Send
+ Sync
+ 'static,
RecursePruner: Fn(&Diff<Self>) -> bool + Send + Sync + 'static,
Out: Send + Unpin + 'static,
{
if self == &other {
return stream::empty().boxed();
}
// Schedule a maximum of 256 concurrently unfolding directories.
let schedule_max = nonzero!(256usize);
// Allow queueing of up to 2,560 items, which would be 10 items per
// directory at the maximum concurrency level. Experiments show this
// is a good balance of queueing items while not spending too long
// determining what can be scheduled.
let queue_max = nonzero!(2560usize);
let after = match after {
None => {
// If `after` is `None`, then we include everything.
After::All
}
Some(mpath_opt) => {
// If `after` is `Some(None)`, then we include everything
// after the root (i.e. not the root itself).
After::new(mpath_opt.as_ref())
}
};
let init = Some((
queue_max.get(),
(Diff::Changed(None, self.clone(), other), after),
));
(async_stream::stream! {
borrowed!(ctx, store, other_store, output_filter, recurse_pruner);
let s = bounded_traversal::bounded_traversal_ordered_stream(
schedule_max,
queue_max,
init,
move |(input, after)| {
async move {
let mut output = Vec::new();
let push_output = |output: &mut Vec<_>, out| {
if let Some(out) = output_filter(out) {
output.push(OrderedTraversal::Output(out));
}
};
let push_recurse = |output: &mut Vec<_>, weight, recurse, after| {
if recurse_pruner(&recurse) {
output.push(OrderedTraversal::Recurse(weight, (recurse, after)));
}
};
match input {
Diff::Changed(path, left, right) => {
let (left_mf, right_mf) = future::try_join(
left.load(ctx, store),
right.load(ctx, other_store),
)
.await?;
if after.include_self() {
push_output(
&mut output,
Diff::Changed(
path.clone(),
Entry::Tree(left),
Entry::Tree(right),
),
);
}
let iter = EntryDiffIterator::new(
left_mf.list_weighted(),
right_mf.list_weighted(),
);
for (name, left, right) in iter {
if after.skip(&name) || left == right {
continue;
}
let path = Some(MPath::join_opt_element(path.as_ref(), &name));
match (left, right) {
(Some(Entry::Leaf(left)), Some(Entry::Leaf(right))) => {
if after.include_file(&name) {
push_output(
&mut output,
Diff::Changed(
path,
Entry::Leaf(left),
Entry::Leaf(right),
),
);
}
}
(
Some(Entry::Leaf(left)),
Some(Entry::Tree((weight, tree))),
) => {
// Removed file comes before all
// files in the dir it is replaced
// by.
if after.include_file(&name) {
push_output(
&mut output,
Diff::Removed(path.clone(), Entry::Leaf(left)),
);
}
push_recurse(
&mut output,
weight,
Diff::Added(path, tree),
after.enter_dir(&name),
);
}
(Some(Entry::Leaf(left)), None) => {
if after.include_file(&name) {
push_output(
&mut output,
Diff::Removed(path, Entry::Leaf(left)),
);
}
}
(
Some(Entry::Tree((weight, tree))),
Some(Entry::Leaf(right)),
) => {
// Added file comes before all
// files in the dir it replaces
if after.include_file(&name) {
push_output(
&mut output,
Diff::Added(path.clone(), Entry::Leaf(right)),
);
}
push_recurse(
&mut output,
weight,
Diff::Removed(path, tree),
after.enter_dir(&name),
);
}
(
Some(Entry::Tree((left_weight, left))),
Some(Entry::Tree((right_weight, right))),
) => {
// Approximate recursion weight
// using `max`. The theoretical
// max is actually the sum of the
// weights, but that is likely to
// be overkill most of the time.
let weight = left_weight.max(right_weight);
push_recurse(
&mut output,
weight,
Diff::Changed(path, left, right),
after.enter_dir(&name),
);
}
(Some(Entry::Tree((weight, tree))), None) => {
push_recurse(
&mut output,
weight,
Diff::Removed(path, tree),
after.enter_dir(&name),
);
}
(None, Some(Entry::Leaf(right))) => {
if after.include_file(&name) {
push_output(
&mut output,
Diff::Added(path.clone(), Entry::Leaf(right)),
);
}
}
(None, Some(Entry::Tree((weight, tree)))) => {
push_recurse(
&mut output,
weight,
Diff::Added(path, tree),
after.enter_dir(&name),
);
}
(None, None) => {}
}
}
}
Diff::Added(path, tree) => {
if after.include_self() {
push_output(
&mut output,
Diff::Added(path.clone(), Entry::Tree(tree.clone())),
);
}
let manifest = tree.load(ctx, other_store).await?;
for (name, entry) in manifest.list_weighted() {
if after.skip(&name) {
continue;
}
let path = Some(MPath::join_opt_element(path.as_ref(), &name));
match entry {
Entry::Tree((weight, tree)) => {
push_recurse(
&mut output,
weight,
Diff::Added(path, tree),
after.enter_dir(&name),
);
}
Entry::Leaf(leaf) => {
if after.include_file(&name) {
push_output(
&mut output,
Diff::Added(path, Entry::Leaf(leaf)),
);
}
}
}
}
}
Diff::Removed(path, tree) => {
if after.include_self() {
push_output(
&mut output,
Diff::Removed(path.clone(), Entry::Tree(tree.clone())),
);
}
let manifest = tree.load(ctx, store).await?;
for (name, entry) in manifest.list_weighted() {
if after.skip(&name) {
continue;
}
let path = Some(MPath::join_opt_element(path.as_ref(), &name));
match entry {
Entry::Tree((weight, tree)) => {
push_recurse(
&mut output,
weight,
Diff::Removed(path, tree),
after.enter_dir(&name),
);
}
Entry::Leaf(leaf) => {
if after.include_file(&name) {
push_output(
&mut output,
Diff::Removed(path, Entry::Leaf(leaf)),
);
}
}
}
}
}
}
Ok(output)
}
.boxed()
},
);
pin_mut!(s);
while let Some(value) = s.next().await {
yield value;
}
})
.boxed()
}