glean/rts/factset.cpp (286 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include "glean/rts/factset.h" namespace facebook { namespace glean { namespace rts { Id FactSet::idByKey(Pid type, folly::ByteRange key) { if (const auto p = keys.lookup(type)) { const auto i = p->find(key); if (i != p->end()) { return (*i)->id(); } } return Id::invalid(); } Pid FactSet::typeById(Id id) { if (id >= starting_id) { const auto i = distance(starting_id, id); if (i < facts.size()) { return facts[i]->type(); } } return Pid::invalid(); } bool FactSet::factById(Id id, std::function<void(Pid, Fact::Clause)> f) { if (id >= starting_id) { const auto i = distance(starting_id, id); if (i < facts.size()) { f(facts[i]->type(), facts[i]->clause()); return true; } } return false; } Interval FactSet::count(Pid pid) const { const auto p = keys.lookup(pid); return p ? p->size() : 0; } std::unique_ptr<FactIterator> FactSet::enumerate(Id from, Id upto) { struct Iterator final : FactIterator { using iter_t = FactSet::const_iterator; iter_t pos; const iter_t end; Iterator(iter_t p, iter_t e) : pos(p), end(e) {} void next() override { assert(pos != end); ++pos; } Fact::Ref get(Demand) override { return pos != end ? pos->ref() : Fact::Ref::invalid(); } }; return std::make_unique<Iterator>( lower_bound(from), upto ? lower_bound(upto) : end()); } std::unique_ptr<FactIterator> FactSet::enumerateBack(Id from, Id downto) { struct BackIterator final : FactIterator { using iter_t = FactSet::const_iterator; iter_t pos; const iter_t end; BackIterator(iter_t p, iter_t e) : pos(p), end(e) {} void next() override { assert(pos != end); --pos; } Fact::Ref get(Demand) override { if (pos != end) { auto i = pos; --i; return i->ref(); } else { return Fact::Ref::invalid(); } } }; return std::make_unique<BackIterator>( from ? lower_bound(from) : end(), lower_bound(downto)); } std::unique_ptr<FactIterator> FactSet::seek( Pid type, folly::ByteRange start, size_t prefix_size) { struct SeekIterator : FactIterator { explicit SeekIterator(Index::map_t::iterator b, Index::map_t::iterator e) : current(b) , end(e) {} void next() override { assert(current != end); ++current; } Fact::Ref get(Demand) override { return current != end ? current->second->ref() : Fact::Ref::invalid(); } Index::map_t::const_iterator current; const Index::map_t::const_iterator end; }; assert(prefix_size <= start.size()); if (const auto p = keys.lookup(type)) { auto& entry = index[type]; // Check if the entry is up to date (i.e., has the same number of items as // the key hashmap). If it doesn't, fill it. if (!entry.withRLock([&](auto& map) { return map.size() == p->size(); })) { entry.withWLock([&](auto& map) { if (map.size() != p->size()) { map.clear(); for (const Fact *fact : *p) { map.insert({fact->key(), fact}); } } }); } // The map for a pid is only created once so this is safe. We are *not* // thread safe with respect to concurrent modifications of the FactSet // as per spec. auto& map = entry.unsafeGetUnlocked(); const auto next = binary::lexicographicallyNext({start.data(), prefix_size}); return std::make_unique<SeekIterator>( map.lower_bound(start), next.empty() ? map.end() : map.lower_bound(binary::byteRange(next))); } else { return std::make_unique<EmptyIterator>(); } } Id FactSet::define(Pid type, Fact::Clause clause, Id) { if (clause.key_size > Fact::MAX_KEY_SIZE) { error("key too large: {}", clause.key_size); } const auto next_id = firstFreeId(); auto fact = Fact::create({next_id, type, clause}); auto& key_map = keys[type]; const auto r = key_map.insert(fact.get()); if (r.second) { fact_memory += fact->size(); facts.push_back(std::move(fact)); return next_id; } else { return fact->value() == (*r.first)->value() ? (*r.first)->id() : Id::invalid(); } } thrift::Batch FactSet::serialize() const { binary::Output output; for (auto& fact : *this) { fact.serialize(output); } thrift::Batch batch; batch.firstId() = startingId().toThrift(); batch.count() = size(); batch.facts() = output.moveToFbString(); return batch; } /// // Serialize facts in the order given by the input range. Preconditions: // // * The order must mention only fact IDs in this set. // * The facts in the set cannot refer to each other (because then we // would need to substitute in addition to reordering) // // The ordering can omit facts or mention facts multiple times, // although I'm not sure why you would want to do that. // thrift::Batch FactSet::serializeReorder(folly::Range<const uint64_t *> order) const { binary::Output output; for (auto i : order) { assert(i >= startingId().toWord() && i - startingId().toWord() < facts.size()); facts[i - startingId().toWord()]->serialize(output); } thrift::Batch batch; batch.firstId() = startingId().toThrift(); batch.count() = size(); batch.facts() = output.moveToFbString(); return batch; } namespace { std::pair<binary::Output, size_t> substituteFact( const Inventory& inventory, const Substituter& substituter, const Fact &fact) { auto predicate = inventory.lookupPredicate(fact.type()); CHECK_NOTNULL(predicate); binary::Output clause; uint64_t key_size; predicate->substitute(substituter, fact.clause(), clause, key_size); return {std::move(clause), key_size}; } } FactSet FactSet::rebase( const Inventory& inventory, const Substitution& subst, Store& global) const { const auto new_start = subst.firstFreeId(); Substituter substituter(&subst, distance(subst.finish(), new_start)); const auto split = lower_bound(subst.finish()); for (auto& fact : folly::range(begin(), split)) { auto r = substituteFact(inventory, substituter, fact); global.insert({ subst.subst(fact.id()), fact.type(), Fact::Clause::from(r.first.bytes(), r.second) }); } FactSet local(new_start); auto expected = new_start; for (auto& fact : folly::range(split, end())) { auto r = substituteFact(inventory, substituter, fact); const auto id = local.define(fact.type(), Fact::Clause::from(r.first.bytes(), r.second)); CHECK(id == expected); ++expected; } return local; } void FactSet::append(FactSet other) { assert(appendable(other)); facts.insert( facts.end(), std::make_move_iterator(other.facts.begin()), std::make_move_iterator(other.facts.end())); keys.merge(std::move(other.keys), [](auto& left, const auto& right) { left.insert(right.begin(), right.end()); }); fact_memory += other.fact_memory; } bool FactSet::appendable(const FactSet& other) const { if (empty() || other.empty()) { return true; } if (firstFreeId() != other.startingId()) { return false; } for (const auto& k : other.keys) { if (const auto *p = keys.lookup(k.first)) { for (auto i = k.second.begin(); i != k.second.end(); ++i) { if (p->contains((*i)->key())) { return false; } } } } return true; } struct FactSet::Index::Impl { Impl() {} Impl(Impl&&) = default; Impl& operator=(Impl&&) = default; Impl(const Impl&) = delete; Impl& operator=(const Impl&) = delete; entry_t& operator[](Pid pid) { // Try getting an existing entry with a reader lock. If there is no entry, // obtain a writer lock and create one if it still doesn't exist. auto p = index.withRLock([pid](auto& locked) { const auto q = locked.lookup(pid); return q ? q->get() : nullptr; }); if (!p) { p = index.withWLock([pid](auto& locked) { auto& r = locked[pid]; if (r.get() == nullptr) { r.reset(new entry_t); } return r.get(); }); } return *p; } folly::Synchronized<DenseMap<Pid, std::unique_ptr<entry_t>>> index; }; FactSet::Index::~Index() { delete impl.load(std::memory_order_relaxed); } FactSet::Index::Index(FactSet::Index&& other) noexcept : impl(other.impl.exchange(nullptr, std::memory_order_acq_rel)) {} FactSet::Index& FactSet::Index::operator=(FactSet::Index&& other) { auto ptr = other.impl.exchange(nullptr, std::memory_order_acq_rel); ptr = impl.exchange(ptr, std::memory_order_release); delete ptr; return *this; } FactSet::Index::entry_t& FactSet::Index::operator[](Pid pid) { auto p = impl.load(std::memory_order_relaxed); if (p == nullptr) { auto k = std::make_unique<Impl>(); if (impl.compare_exchange_strong(p, k.get(), std::memory_order_acq_rel)) { p = k.release(); } } return (*p)[pid]; } bool FactSet::sanityCheck() const { // TODO: implement return true; } } } }