fn filtered_diff_ordered()

in eden/mononoke/manifest/src/ordered_ops.rs [282:567]


    fn filtered_diff_ordered<FilterMap, Out, RecursePruner>(
        &self,
        ctx: CoreContext,
        store: Store,
        other: Self,
        other_store: Store,
        after: Option<Option<MPath>>,
        output_filter: FilterMap,
        recurse_pruner: RecursePruner,
    ) -> BoxStream<'static, Result<Out, Error>>
    where
        FilterMap: Fn(
                Diff<Entry<Self, <<Self as StoreLoadable<Store>>::Value as Manifest>::LeafId>>,
            ) -> Option<Out>
            + Send
            + Sync
            + 'static,
        RecursePruner: Fn(&Diff<Self>) -> bool + Send + Sync + 'static,
        Out: Send + Unpin + 'static,
    {
        if self == &other {
            return stream::empty().boxed();
        }

        // Schedule a maximum of 256 concurrently unfolding directories.
        let schedule_max = nonzero!(256usize);

        // Allow queueing of up to 2,560 items, which would be 10 items per
        // directory at the maximum concurrency level.  Experiments show this
        // is a good balance of queueing items while not spending too long
        // determining what can be scheduled.
        let queue_max = nonzero!(2560usize);

        let after = match after {
            None => {
                // If `after` is `None`, then we include everything.
                After::All
            }
            Some(mpath_opt) => {
                // If `after` is `Some(None)`, then we include everything
                // after the root (i.e. not the root itself).
                After::new(mpath_opt.as_ref())
            }
        };

        let init = Some((
            queue_max.get(),
            (Diff::Changed(None, self.clone(), other), after),
        ));

        (async_stream::stream! {
            borrowed!(ctx, store, other_store, output_filter, recurse_pruner);

            let s = bounded_traversal::bounded_traversal_ordered_stream(
                schedule_max,
                queue_max,
                init,
                move |(input, after)| {
                    async move {
                        let mut output = Vec::new();

                        let push_output = |output: &mut Vec<_>, out| {
                            if let Some(out) = output_filter(out) {
                                output.push(OrderedTraversal::Output(out));
                            }
                        };

                        let push_recurse = |output: &mut Vec<_>, weight, recurse, after| {
                            if recurse_pruner(&recurse) {
                                output.push(OrderedTraversal::Recurse(weight, (recurse, after)));
                            }
                        };

                        match input {
                            Diff::Changed(path, left, right) => {
                                let (left_mf, right_mf) = future::try_join(
                                    left.load(ctx, store),
                                    right.load(ctx, other_store),
                                )
                                .await?;

                                if after.include_self() {
                                    push_output(
                                        &mut output,
                                        Diff::Changed(
                                            path.clone(),
                                            Entry::Tree(left),
                                            Entry::Tree(right),
                                        ),
                                    );
                                }

                                let iter = EntryDiffIterator::new(
                                    left_mf.list_weighted(),
                                    right_mf.list_weighted(),
                                );
                                for (name, left, right) in iter {
                                    if after.skip(&name) || left == right {
                                        continue;
                                    }
                                    let path = Some(MPath::join_opt_element(path.as_ref(), &name));
                                    match (left, right) {
                                        (Some(Entry::Leaf(left)), Some(Entry::Leaf(right))) => {
                                            if after.include_file(&name) {
                                                push_output(
                                                    &mut output,
                                                    Diff::Changed(
                                                        path,
                                                        Entry::Leaf(left),
                                                        Entry::Leaf(right),
                                                    ),
                                                );
                                            }
                                        }
                                        (
                                            Some(Entry::Leaf(left)),
                                            Some(Entry::Tree((weight, tree))),
                                        ) => {
                                            // Removed file comes before all
                                            // files in the dir it is replaced
                                            // by.
                                            if after.include_file(&name) {
                                                push_output(
                                                    &mut output,
                                                    Diff::Removed(path.clone(), Entry::Leaf(left)),
                                                );
                                            }
                                            push_recurse(
                                                &mut output,
                                                weight,
                                                Diff::Added(path, tree),
                                                after.enter_dir(&name),
                                            );
                                        }
                                        (Some(Entry::Leaf(left)), None) => {
                                            if after.include_file(&name) {
                                                push_output(
                                                    &mut output,
                                                    Diff::Removed(path, Entry::Leaf(left)),
                                                );
                                            }
                                        }
                                        (
                                            Some(Entry::Tree((weight, tree))),
                                            Some(Entry::Leaf(right)),
                                        ) => {
                                            // Added file comes before all
                                            // files in the dir it replaces
                                            if after.include_file(&name) {
                                                push_output(
                                                    &mut output,
                                                    Diff::Added(path.clone(), Entry::Leaf(right)),
                                                );
                                            }
                                            push_recurse(
                                                &mut output,
                                                weight,
                                                Diff::Removed(path, tree),
                                                after.enter_dir(&name),
                                            );
                                        }
                                        (
                                            Some(Entry::Tree((left_weight, left))),
                                            Some(Entry::Tree((right_weight, right))),
                                        ) => {
                                            // Approximate recursion weight
                                            // using `max`.  The theoretical
                                            // max is actually the sum of the
                                            // weights, but that is likely to
                                            // be overkill most of the time.
                                            let weight = left_weight.max(right_weight);
                                            push_recurse(
                                                &mut output,
                                                weight,
                                                Diff::Changed(path, left, right),
                                                after.enter_dir(&name),
                                            );
                                        }
                                        (Some(Entry::Tree((weight, tree))), None) => {
                                            push_recurse(
                                                &mut output,
                                                weight,
                                                Diff::Removed(path, tree),
                                                after.enter_dir(&name),
                                            );
                                        }
                                        (None, Some(Entry::Leaf(right))) => {
                                            if after.include_file(&name) {
                                                push_output(
                                                    &mut output,
                                                    Diff::Added(path.clone(), Entry::Leaf(right)),
                                                );
                                            }
                                        }
                                        (None, Some(Entry::Tree((weight, tree)))) => {
                                            push_recurse(
                                                &mut output,
                                                weight,
                                                Diff::Added(path, tree),
                                                after.enter_dir(&name),
                                            );
                                        }
                                        (None, None) => {}
                                    }
                                }
                            }
                            Diff::Added(path, tree) => {
                                if after.include_self() {
                                    push_output(
                                        &mut output,
                                        Diff::Added(path.clone(), Entry::Tree(tree.clone())),
                                    );
                                }
                                let manifest = tree.load(ctx, other_store).await?;
                                for (name, entry) in manifest.list_weighted() {
                                    if after.skip(&name) {
                                        continue;
                                    }
                                    let path = Some(MPath::join_opt_element(path.as_ref(), &name));
                                    match entry {
                                        Entry::Tree((weight, tree)) => {
                                            push_recurse(
                                                &mut output,
                                                weight,
                                                Diff::Added(path, tree),
                                                after.enter_dir(&name),
                                            );
                                        }
                                        Entry::Leaf(leaf) => {
                                            if after.include_file(&name) {
                                                push_output(
                                                    &mut output,
                                                    Diff::Added(path, Entry::Leaf(leaf)),
                                                );
                                            }
                                        }
                                    }
                                }
                            }
                            Diff::Removed(path, tree) => {
                                if after.include_self() {
                                    push_output(
                                        &mut output,
                                        Diff::Removed(path.clone(), Entry::Tree(tree.clone())),
                                    );
                                }
                                let manifest = tree.load(ctx, store).await?;
                                for (name, entry) in manifest.list_weighted() {
                                    if after.skip(&name) {
                                        continue;
                                    }
                                    let path = Some(MPath::join_opt_element(path.as_ref(), &name));
                                    match entry {
                                        Entry::Tree((weight, tree)) => {
                                            push_recurse(
                                                &mut output,
                                                weight,
                                                Diff::Removed(path, tree),
                                                after.enter_dir(&name),
                                            );
                                        }
                                        Entry::Leaf(leaf) => {
                                            if after.include_file(&name) {
                                                push_output(
                                                    &mut output,
                                                    Diff::Removed(path, Entry::Leaf(leaf)),
                                                );
                                            }
                                        }
                                    }
                                }
                            }
                        }
                        Ok(output)
                    }
                    .boxed()
                },
            );

            pin_mut!(s);
            while let Some(value) = s.next().await {
                yield value;
            }
        })
        .boxed()
    }