watchman/PendingCollection.cpp (239 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #include "watchman/PendingCollection.h" #include <folly/Synchronized.h> #include "watchman/Cookie.h" #include "watchman/Logging.h" #include "watchman/watchman_dir.h" using namespace watchman; namespace watchman { const PendingFlags::NameTable PendingFlags::table = { {W_PENDING_CRAWL_ONLY, "CRAWL_ONLY"}, {W_PENDING_RECURSIVE, "RECURSIVE"}, {W_PENDING_NONRECURSIVE_SCAN, "NONRECURSIVE_SCAN"}, {W_PENDING_VIA_NOTIFY, "VIA_NOTIFY"}, {W_PENDING_IS_DESYNCED, "IS_DESYNCED"}, }; bool is_path_prefix( const char* path, size_t path_len, const char* other, size_t common_prefix) { if (common_prefix > path_len) { return false; } w_assert( memcmp(path, other, common_prefix) == 0, "is_path_prefix: %.*s vs %.*s should have %d common_prefix chars\n", (int)path_len, path, (int)common_prefix, other, (int)common_prefix); (void)other; if (common_prefix == path_len) { return true; } return is_slash(path[common_prefix]); } } // namespace watchman void PendingChanges::clear() { pending_.reset(); tree_.clear(); syncs_.clear(); } void PendingChanges::add( const w_string& path, std::chrono::system_clock::time_point now, PendingFlags flags) { auto existing = tree_.search(path); if (existing) { /* Entry already exists: consolidate */ consolidateItem(existing->get(), flags); /* all done */ return; } if (isObsoletedByContainingDir(path)) { return; } // Try to allocate the new node before we prune any children. auto p = std::make_shared<watchman_pending_fs>(path, now, flags); maybePruneObsoletedChildren(path, flags); logf(DBG, "add_pending: {} {}\n", path, flags.format()); tree_.insert(path, p); linkHead(std::move(p)); } void PendingChanges::add( watchman_dir* dir, const char* name, std::chrono::system_clock::time_point now, PendingFlags flags) { return add(dir->getFullPathToChild(name), now, flags); } void PendingChanges::addSync(folly::Promise<folly::Unit> promise) { syncs_.push_back(std::move(promise)); } void PendingChanges::append( std::shared_ptr<watchman_pending_fs> chain, std::vector<folly::Promise<folly::Unit>> syncs) { auto p = std::move(chain); while (p) { auto target_p = tree_.search((const uint8_t*)p->path.data(), p->path.size()); if (target_p) { /* Entry already exists: consolidate */ consolidateItem(target_p->get(), p->flags); p = std::move(p->next); continue; } if (isObsoletedByContainingDir(p->path)) { p = std::move(p->next); continue; } maybePruneObsoletedChildren(p->path, p->flags); auto next = std::move(p->next); tree_.insert(p->path, p); linkHead(std::move(p)); p = std::move(next); } syncs_.insert( syncs_.end(), std::make_move_iterator(syncs.begin()), std::make_move_iterator(syncs.end())); } std::shared_ptr<watchman_pending_fs> PendingChanges::stealItems() { tree_.clear(); return std::move(pending_); } std::vector<folly::Promise<folly::Unit>> PendingChanges::stealSyncs() { std::vector<folly::Promise<folly::Unit>> syncs; std::swap(syncs, syncs_); return syncs; } bool PendingChanges::empty() const { return 0 == tree_.size() && syncs_.empty(); } uint32_t PendingChanges::getPendingItemCount() const { return tree_.size(); } // if there are any entries that are obsoleted by a recursive insert, // walk over them now and mark them as ignored. void PendingChanges::maybePruneObsoletedChildren( w_string path, PendingFlags flags) { if ((flags & (W_PENDING_RECURSIVE | W_PENDING_CRAWL_ONLY)) == W_PENDING_RECURSIVE) { uint32_t pruned = 0; // Since deletion invalidates the iterator, we need to repeatedly // call this to prune out the nodes. It will return 0 once no // matching prefixes are found and deleted. // Deletion is a bit awkward in this radix tree implementation. // We can't recursively delete a given prefix as a built-in operation // and it is non-trivial to add that functionality right now. // When we lop-off a portion of a tree that we're going to analyze // recursively, we have to iterate each leaf and explicitly delete // that leaf. // Since deletion invalidates the iteration state we have to signal // to stop iteration after each deletion and then retry the prefix // deletion. // // We need to compare the prefix to make sure that we don't delete // a sibling node by mistake (see commentary on the is_path_prefix // function for more on that). auto callback = [&](const w_string& key, std::shared_ptr<watchman_pending_fs>& p) -> int { w_check( p, "Pending changes should be removed from both the list and the tree."); if (!p->flags.contains(W_PENDING_CRAWL_ONLY) && key.size() > path.size() && is_path_prefix( (const char*)key.data(), key.size(), path.data(), path.size()) && !isPossiblyACookie(p->path)) { logf( DBG, "delete_kids: removing ({}) {} from pending because it is " "obsoleted by ({}) {}\n", p->path.size(), p->path, path.size(), path); // Unlink the child from the pending index. unlinkItem(p); // Remove it from the art tree. tree_.erase(key); // Stop iteration because we just invalidated the iterator state // by modifying the tree mid-iteration. return 1; } return 0; }; while (tree_.iterPrefix( reinterpret_cast<const uint8_t*>(path.data()), path.size(), callback)) { // OK; try again ++pruned; } if (pruned) { logf( DBG, "maybePruneObsoletedChildren: pruned {} nodes under ({}) {}\n", pruned, path.size(), path); } } } void PendingChanges::consolidateItem( watchman_pending_fs* p, PendingFlags flags) { // Increase the strength of the pending item if either of these // flags are set. // We upgrade crawl-only as well as recursive; it indicates that // we've recently just performed the stat and we want to avoid // infinitely trying to stat-and-crawl p->flags.set( flags & (W_PENDING_CRAWL_ONLY | W_PENDING_RECURSIVE | W_PENDING_NONRECURSIVE_SCAN | W_PENDING_IS_DESYNCED)); maybePruneObsoletedChildren(p->path, p->flags); } // Check the tree to see if there is a path that is earlier/higher in the // filesystem than the input path; if there is, and it is recursive, // return true to indicate that there is no need to track this new path // due to the already scheduled higher level path. bool PendingChanges::isObsoletedByContainingDir(const w_string& path) { auto leaf = tree_.longestMatch((const uint8_t*)path.data(), path.size()); if (!leaf) { return false; } auto p = leaf->value; if ((p->flags & (W_PENDING_RECURSIVE | W_PENDING_CRAWL_ONLY)) == W_PENDING_RECURSIVE && is_path_prefix( path.data(), path.size(), (const char*)leaf->key.data(), leaf->key.size())) { if (isPossiblyACookie(path)) { return false; } // Yes: the pre-existing entry higher up in the tree obsoletes this // one that we would add now. logf(DBG, "is_obsoleted: SKIP {} is obsoleted by {}\n", path, p->path); return true; } return false; } // Helper to doubly-link a pending item to the head of a collection. void PendingChanges::linkHead(std::shared_ptr<watchman_pending_fs>&& p) { p->prev.reset(); p->next = pending_; if (p->next) { p->next->prev = p; } pending_ = std::move(p); } // Helper to un-doubly-link a pending item. void PendingChanges::unlinkItem(std::shared_ptr<watchman_pending_fs>& p) { if (pending_ == p) { pending_ = p->next; } auto prev = p->prev.lock(); if (prev) { prev->next = p->next; } if (p->next) { p->next->prev = prev; } p->next.reset(); p->prev.reset(); } PendingCollectionBase::PendingCollectionBase(std::condition_variable& cond) : cond_(cond) {} void PendingCollectionBase::ping() { pinged_ = true; cond_.notify_all(); } bool PendingCollectionBase::checkAndResetPinged() { if (pending_ || pinged_) { pinged_ = false; return true; } return false; } PendingCollection::PendingCollection() : folly::Synchronized<PendingCollectionBase, std::mutex>{ folly::in_place, cond_} {} PendingCollection::LockedPtr PendingCollection::lockAndWait( std::chrono::milliseconds timeoutms) { auto lock = this->lock(); if (lock->checkAndResetPinged()) { return lock; } if (timeoutms.count() == -1) { cond_.wait(lock.as_lock()); } else { cond_.wait_for(lock.as_lock(), timeoutms); } lock->checkAndResetPinged(); return lock; } /* vim:ts=2:sw=2:et: */