glean/rocksdb/rocksdb.cpp (1,282 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <utility>
#include <folly/Range.h>
#include <folly/container/F14Map.h>
#include <rocksdb/db.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/slice_transform.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/backup_engine.h>
#include "glean/rocksdb/rocksdb.h"
#include "glean/rocksdb/stats.h"
#ifdef FACEBOOK
#include "glean/facebook/rocksdb/rocksdb.h"
#endif
#include "glean/rts/binary.h"
#include "glean/rts/factset.h"
#include "glean/rts/nat.h"
#include "glean/rts/ownership/setu32.h"
#include "glean/rts/timer.h"
namespace facebook {
namespace glean {
namespace rocks {
using namespace rts;
namespace {
[[noreturn]] void error(const rocksdb::Status& s) {
rts::error("rocksdb: " + s.ToString());
}
void check(const rocksdb::Status& status) {
if (!status.ok()) {
error(status);
}
}
folly::ByteRange byteRange(const rocksdb::Slice& slice) {
return folly::ByteRange(
reinterpret_cast<const unsigned char *>(slice.data()), slice.size());
}
rocksdb::Slice slice(const folly::ByteRange& range) {
return rocksdb::Slice(
reinterpret_cast<const char *>(range.data()), range.size());
}
rocksdb::Slice slice(binary::Output& output) {
return slice(output.bytes());
}
template<typename T>
rocksdb::Slice toSlice(const T& x) {
return rocksdb::Slice(
reinterpret_cast<const char *>(&x), sizeof(x));
}
template<typename T>
T fromSlice(const rocksdb::Slice& slice) {
assert(slice.size() == sizeof(T));
T x;
std::memcpy(&x, slice.data(), slice.size());
return x;
}
binary::Input input(const rocksdb::Slice& slice) {
return binary::Input(byteRange(slice));
}
}
namespace {
struct Family {
private:
template<typename F>
Family(const char *n, F&& o, bool keep_ = true)
: index(families.size()), name(n), options(std::forward<F>(o)), keep(keep_)
{
families.push_back(this);
}
Family(const Family&) = delete;
Family& operator=(const Family&) = delete;
static std::vector<const Family *> families;
public:
size_t index;
const char *name;
std::function<void(rocksdb::ColumnFamilyOptions&)> options;
// Whether to keep this column family after the DB is complete. If
// keep = false, then the contents of the column family will be
// deleted before compaction.
bool keep = true;
static const Family admin;
static const Family entities;
static const Family keys;
static const Family stats;
static const Family meta;
static const Family ownershipUnits;
static const Family ownershipUnitIds;
static const Family ownershipRaw;
static const Family ownershipDerivedRaw;
static const Family ownershipSets;
static const Family factOwners;
static size_t count() { return families.size(); }
static const Family * FOLLY_NULLABLE family(const std::string& name) {
for (auto p : families) {
if (name == p->name) {
return p;
}
}
return nullptr;
}
static const Family * FOLLY_NULLABLE family(size_t i) {
return i < families.size() ? families[i] : nullptr;
}
};
std::vector<const Family *> Family::families;
const Family Family::admin("admin", [](auto& opts){
opts.OptimizeForPointLookup(100); });
const Family Family::entities("entities", [](auto& opts){
// NOTE: Setting inplace_update_support=true leads to rocksdb assertion
// failures when iteration backwards.
opts.inplace_update_support = false;
opts.OptimizeForPointLookup(100); });
const Family Family::keys("keys", [](auto& opts) {
opts.prefix_extractor.reset(
rocksdb::NewFixedPrefixTransform(sizeof(Id::word_type))); });
const Family Family::stats("stats", [](auto& opts) {
opts.OptimizeForPointLookup(10); });
const Family Family::meta("meta", [](auto&) {});
const Family Family::ownershipUnits("ownershipUnits", [](auto& opts) {
opts.OptimizeForPointLookup(100); });
const Family Family::ownershipUnitIds("ownershipUnitIds", [](auto& opts) {
opts.OptimizeForPointLookup(100); });
const Family Family::ownershipRaw("ownershipRaw", [](auto&) {}, false);
const Family Family::ownershipDerivedRaw("ownershipDerivedRaw", [](auto& opts) {
opts.inplace_update_support = false; }, false);
const Family Family::ownershipSets("ownershipSets", [](auto& opts){
opts.inplace_update_support = false; });
const Family Family::factOwners("factOwners", [](auto& opts){
opts.inplace_update_support = false; });
enum class AdminId : uint32_t {
NEXT_ID,
VERSION,
STARTING_ID
};
const char *admin_names[] = {
"NEXT_ID",
"VERSION",
"STARTING_ID"
};
struct ContainerImpl final : Container {
Mode mode;
rocksdb::Options options;
rocksdb::WriteOptions writeOptions;
std::unique_ptr<rocksdb::DB> db;
std::vector<rocksdb::ColumnFamilyHandle *> families;
ContainerImpl(
const std::string& path,
Mode m,
folly::Optional<std::shared_ptr<Cache>> cache) {
mode = m;
if (mode == Mode::Create ) {
options.error_if_exists = true;
options.create_if_missing = true;
} else {
options.error_if_exists = false;
options.create_if_missing = false;
}
options.inplace_update_support = true;
options.allow_concurrent_memtable_write = false;
{
rocksdb::BlockBasedTableOptions table_options;
if (cache) {
table_options.block_cache = std::move(cache.value());
}
table_options.filter_policy.reset(
rocksdb::NewBloomFilterPolicy(10, false));
table_options.whole_key_filtering = true;
options.table_factory.reset(
rocksdb::NewBlockBasedTableFactory(table_options));
}
#ifdef FACEBOOK
localOptions(options);
#endif
// options.IncreaseParallelism();
// options.compression = rocksdb::CompressionType::kNoCompression;
// writeOptions.sync = false;
// writeOptions.disableWAL = true;
families.resize(Family::count(), nullptr);
std::vector<std::string> names;
if (mode != Mode::Create) {
check(rocksdb::DB::ListColumnFamilies(options,path,&names));
}
std::vector<rocksdb::ColumnFamilyDescriptor> existing;
std::vector<rocksdb::ColumnFamilyHandle**> ptrs;
for (const auto& name : names) {
if (name != rocksdb::kDefaultColumnFamilyName) {
if (auto family = Family::family(name)) {
rocksdb::ColumnFamilyOptions opts(options);
family->options(opts);
existing.push_back(rocksdb::ColumnFamilyDescriptor(name, opts));
ptrs.push_back(&families[family->index]);
} else {
rts::error("Unknown column family '{}'", name);
}
}
}
existing.push_back(
rocksdb::ColumnFamilyDescriptor(
rocksdb::kDefaultColumnFamilyName, options));
ptrs.push_back(nullptr);
std::vector<rocksdb::ColumnFamilyHandle*> hs;
rocksdb::DB *db_ptr;
check(rocksdb::DB::Open(
options,
path,
existing,
&hs,
&db_ptr
));
if (!db_ptr) {
rts::error("got nullptr from rocksdb");
} else {
db.reset(db_ptr);
}
assert(hs.size() == ptrs.size());
for (size_t i = 0; i < ptrs.size(); ++i) {
if (ptrs[i] != nullptr) {
*ptrs[i] = hs[i];
} else {
db->DestroyColumnFamilyHandle(hs[i]);
}
}
for (size_t i = 0; i < families.size(); ++i) {
if (families[i] == nullptr) {
auto family = Family::family(i);
assert(family != nullptr);
rocksdb::ColumnFamilyOptions opts(options);
family->options(opts);
check(db->CreateColumnFamily(
opts,
family->name,
&families[i]));
}
}
}
ContainerImpl(const ContainerImpl&) = delete;
ContainerImpl(ContainerImpl&& other) = default;
ContainerImpl& operator=(const ContainerImpl&) = delete;
ContainerImpl& operator=(ContainerImpl&&) = delete;
~ContainerImpl() override {
close();
}
void close() noexcept override {
if (db) {
for (auto handle : families) {
if (handle) {
try {
db->DestroyColumnFamilyHandle(handle);
} catch(const std::exception& e) {
LOG(ERROR) << e.what();
} catch(...) {
LOG(ERROR) << "unknown error while closing column family";
}
}
}
families.resize(0);
db.reset();
}
}
void requireOpen() const {
if (!db) {
rts::error("rocksdb: database is closed");
}
}
rocksdb::ColumnFamilyHandle *family(const Family& family) const {
assert(family.index < families.size());
return families[family.index];
}
void writeData(folly::ByteRange key, folly::ByteRange value) override {
requireOpen();
check(db->Put(
writeOptions,
family(Family::meta),
slice(key),
slice(value)));
}
bool readData(
folly::ByteRange key,
std::function<void(folly::ByteRange)> f) override {
requireOpen();
rocksdb::PinnableSlice val;
auto s = db->Get(
rocksdb::ReadOptions(),
family(Family::meta),
slice(key),
&val);
if (s.IsNotFound()) {
return false;
} else {
check(s);
f(byteRange(val));
return true;
}
}
void optimize() override {
for (uint32_t i = 0; i < families.size(); i++) {
auto family = Family::family(i);
auto handle = families[i];
if (handle && family) {
if (!family->keep) {
// delete the contents of this column family
check(db->DropColumnFamily(handle));
db->DestroyColumnFamilyHandle(handle);
rocksdb::ColumnFamilyOptions opts(options);
family->options(opts);
check(db->CreateColumnFamily(
opts,
family->name,
&handle));
families[i] = handle;
}
const auto nlevels = db->NumberLevels(handle);
if (nlevels != 2) {
rocksdb::CompactRangeOptions copts;
copts.change_level = true;
copts.target_level = 1;
check(db->CompactRange(copts, handle, nullptr, nullptr));
}
}
}
}
static std::unique_ptr<rocksdb::BackupEngine> backupEngine(
const std::string& path) {
rocksdb::BackupEngine *p;
check(rocksdb::BackupEngine::Open(
rocksdb::Env::Default(),
rocksdb::BackupEngineOptions(path),
&p));
return std::unique_ptr<rocksdb::BackupEngine>(p);
}
void backup(const std::string& path) override {
requireOpen();
check(backupEngine(path)->CreateNewBackup(db.get(), true));
}
std::unique_ptr<Database> openDatabase(Id start, int32_t version) && override;
};
void serializeEliasFano(binary::Output& out, const OwnerSet& set) {
out.nat(set.size);
out.nat(set.numLowerBits);
out.nat(set.upperSizeBytes);
out.nat(set.skipPointers - set.data.begin());
out.nat(set.forwardPointers - set.data.begin());
out.nat(set.lower - set.data.begin());
out.nat(set.upper - set.data.begin());
out.put(set.data);
}
void deserializeEliasFano(binary::Input& in, OwnerSet& set) {
set.size = in.trustedNat();
set.numLowerBits = in.trustedNat();
set.upperSizeBytes = in.trustedNat();
auto skipPointers = in.trustedNat();
auto forwardPointers = in.trustedNat();
auto lower = in.trustedNat();
auto upper = in.trustedNat();
set.data = in.bytes();
set.skipPointers = set.data.begin() + skipPointers;
set.forwardPointers = set.data.begin() + forwardPointers;
set.lower = set.data.begin() + lower;
set.upper = set.data.begin() + upper;
}
struct DatabaseImpl final : Database {
int64_t db_version;
ContainerImpl container_;
Id starting_id;
Id next_id;
AtomicPredicateStats stats_;
std::vector<size_t> ownership_unit_counters;
folly::F14FastMap<uint64_t,size_t> ownership_derived_counters;
// Cached ownership sets, only used when writing.
// TODO: initialize this lazily
std::unique_ptr<Usets> usets_;
explicit DatabaseImpl(ContainerImpl c, Id start, int64_t version)
: container_(std::move(c)) {
starting_id = Id::fromWord(getAdminValue(
AdminId::STARTING_ID,
start.toWord(),
container_.mode == Mode::Create,
[]{}));
next_id = Id::fromWord(getAdminValue(
AdminId::NEXT_ID,
start.toWord(),
container_.mode == Mode::Create,
[mode = container_.mode] {
if (mode != Mode::Create) {
rts::error("corrupt database - missing NEXT_ID");
}
}));
db_version = getAdminValue(
AdminId::VERSION,
version,
container_.mode == Mode::Create,
[]{});
if (db_version != version) {
rts::error("unexpected database version {}", db_version);
}
stats_.set(loadStats());
ownership_unit_counters = loadOwnershipUnitCounters();
ownership_derived_counters = loadOwnershipDerivedCounters();
usets_ = loadOwnershipSets();
}
DatabaseImpl(const DatabaseImpl&) = delete;
DatabaseImpl& operator=(const DatabaseImpl&) = delete;
DatabaseImpl(DatabaseImpl&&) = delete;
DatabaseImpl& operator=(DatabaseImpl&&) = delete;
Container& container() noexcept override {
return container_;
}
template<typename T, typename F>
T getAdminValue(AdminId id, T def, bool write, F&& notFound) {
container_.requireOpen();
rocksdb::PinnableSlice val;
binary::Output key;
key.fixed(id);
auto s = container_.db->Get(
rocksdb::ReadOptions(),
container_.family(Family::admin),
slice(key),
&val);
if (!s.IsNotFound()) {
check(s);
binary::Input value = input(val);
auto result = value.fixed<T>();
if (!value.empty()) {
rts::error("corrupt database - invalid {}",
admin_names[static_cast<uint32_t>(id)]);
}
return result;
} else {
notFound();
if (write) {
binary::Output value;
value.fixed(def);
check(container_.db->Put(
container_.writeOptions,
container_.family(Family::admin),
slice(key),
slice(value)));
}
return def;
}
}
PredicateStats loadStats() {
container_.requireOpen();
PredicateStats stats;
std::unique_ptr<rocksdb::Iterator> iter(
container_.db->NewIterator(
rocksdb::ReadOptions(),
container_.family(Family::stats)
)
);
if (!iter) {
rts::error("rocksdb: couldn't allocate iterator");
}
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
binary::Input key(byteRange(iter->key()));
stats[key.fixed<Pid>()] = fromSlice<MemoryStats>(iter->value());
assert(key.empty());
}
auto s = iter->status();
if (!s.IsNotFound()) {
check(s);
}
return stats;
}
std::vector<size_t> loadOwnershipUnitCounters() {
container_.requireOpen();
std::vector<size_t> result;
std::unique_ptr<rocksdb::Iterator> iter(
container_.db->NewIterator(
rocksdb::ReadOptions(),
container_.family(Family::ownershipRaw)));
if (!iter) {
rts::error("rocksdb: couldn't allocate iterator");
}
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
binary::Input key(byteRange(iter->key()));
auto id = key.trustedNat();
if (id == result.size()) {
result.push_back(0);
} else if (id+1 == result.size()) {
++result.back();
} else {
rts::error("rocksdb: invalid ownershipUnits");
}
}
return result;
}
folly::F14FastMap<uint64_t,size_t> loadOwnershipDerivedCounters() {
container_.requireOpen();
folly::F14FastMap<uint64_t,size_t> result;
std::unique_ptr<rocksdb::Iterator> iter(
container_.db->NewIterator(
rocksdb::ReadOptions(),
container_.family(Family::ownershipDerivedRaw)));
if (!iter) {
rts::error("rocksdb: couldn't allocate iterator");
}
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
binary::Input key(byteRange(iter->key()));
auto pid = key.trustedNat();
const auto [i,_] = result.insert({pid,0});
++i->second;
}
VLOG(1) << "derived fact owners for " << result.size() << " pids";
return result;
}
std::unique_ptr<Usets> loadOwnershipSets() {
auto t = makeAutoTimer("loadOwnershipSets");
auto iter = this->getSetIterator();
auto [first,size] = iter->sizes();
auto usets = std::make_unique<Usets>(first + size);
while (const auto pair = iter->get()) {
auto set = SetU32::fromEliasFano(*pair->second.set);
auto p = usets->add(std::move(set),0);
p->id = pair->first;
}
auto stats = usets->statistics();
LOG(INFO) << "loadOwnershipSets loaded " << stats.adds << " sets, " <<
stats.bytes << " bytes";
return usets;
}
folly::Optional<uint32_t> getUnitId(folly::ByteRange unit) override {
rocksdb::PinnableSlice val;
auto s = container_.db->Get(
rocksdb::ReadOptions(),
container_.family(Family::ownershipUnits),
slice(unit),
&val);
if (!s.IsNotFound()) {
check(s);
assert(val.size() == sizeof(uint32_t));
return folly::loadUnaligned<uint32_t>(val.data());
} else {
return folly::none;
}
}
folly::Optional<std::string> getUnit(uint32_t unit_id) override {
rocksdb::PinnableSlice val;
auto s = container_.db->Get(
rocksdb::ReadOptions(),
container_.family(Family::ownershipUnitIds),
slice(EncodedNat(unit_id).byteRange()),
&val);
if (!s.IsNotFound()) {
check(s);
return val.ToString();
} else {
return folly::none;
}
}
rts::Id startingId() const override {
return starting_id;
}
rts::Id firstFreeId() const override {
return next_id;
}
Id idByKey(Pid type, folly::ByteRange key) override {
if (count(type).high() == 0) {
return Id::invalid();
}
container_.requireOpen();
rocksdb::PinnableSlice out;
binary::Output k;
k.fixed(type);
k.put(key);
auto s = container_.db->Get(
rocksdb::ReadOptions(),
container_.family(Family::keys),
slice(k),
&out);
if (s.IsNotFound()) {
return Id::invalid();
} else {
check(s);
binary::Input value = input(out);
auto id = value.fixed<Id>();
assert(value.empty());
return id;
}
}
bool lookupById(Id id, rocksdb::PinnableSlice &val) const {
if (id < startingId() || id >= firstFreeId()) {
return false;
}
binary::Output key;
if (db_version <= 2) {
key.fixed(id);
} else {
key.nat(id.toWord());
}
val.Reset();
auto s = container_.db->Get(
rocksdb::ReadOptions(),
container_.family(Family::entities),
slice(key),
&val
);
if (s.IsNotFound()) {
return false;
} else {
check(s);
return true;
}
}
static rts::Fact::Ref decomposeFact(Id id, const rocksdb::Slice& data) {
auto inp = input(data);
const auto ty = inp.packed<Pid>();
const auto key_size = inp.packed<uint32_t>();
return
rts::Fact::Ref{id, ty, rts::Fact::Clause::from(inp.bytes(), key_size)};
}
Pid typeById(Id id) override {
container_.requireOpen();
rocksdb::PinnableSlice val;
if (lookupById(id, val)) {
return input(val).packed<Pid>();
} else {
return Pid::invalid();
}
}
bool factById(Id id, std::function<void(Pid, Fact::Clause)> f) override {
container_.requireOpen();
rocksdb::PinnableSlice val;
if (lookupById(id, val)) {
auto ref = decomposeFact(id, val);
f(ref.type, ref.clause);
return true;
} else {
return false;
}
}
struct SeekIterator final : rts::FactIterator {
SeekIterator(
folly::ByteRange start,
size_t prefix_size,
Pid type,
const DatabaseImpl *db)
: upper_bound_(binary::lexicographicallyNext({start.data(), prefix_size}))
, upper_bound_slice_(
reinterpret_cast<const char *>(upper_bound_.data()),
upper_bound_.size())
, type_(type)
, db_(db)
{
assert(prefix_size <= start.size());
// both upper_bound_slice_ and options_ need to be alive for the duration
// of the iteration
options_.iterate_upper_bound = &upper_bound_slice_;
iter_.reset(
db->container_.db->NewIterator(
options_,
db->container_.family(Family::keys)));
if (iter_) {
iter_->Seek(slice(start));
} else {
rts::error("rocksdb: couldn't allocate iterator");
}
}
void next() override {
iter_->Next();
auto s = iter_->status();
if (!s.IsNotFound()) {
check(s);
}
}
Fact::Ref get(Demand demand) override {
if (iter_->Valid()) {
auto key = input(iter_->key());
auto ty = key.fixed<Pid>();
assert(ty == type_);
auto value = input(iter_->value());
auto id = value.fixed<Id>();
assert(value.empty());
if (demand == KeyOnly) {
return Fact::Ref{id, type_, Fact::Clause::fromKey(key.bytes())};
} else {
auto found = db_->lookupById(id, slice_);
assert(found);
return decomposeFact(id, slice_);
}
} else {
return Fact::Ref::invalid();
}
}
const std::vector<unsigned char> upper_bound_;
const rocksdb::Slice upper_bound_slice_;
const Pid type_;
rocksdb::ReadOptions options_;
std::unique_ptr<rocksdb::Iterator> iter_;
const DatabaseImpl *db_;
rocksdb::PinnableSlice slice_;
};
std::unique_ptr<rts::FactIterator> seek(
Pid type, folly::ByteRange start, size_t prefix_size) override {
assert(prefix_size <= start.size());
if (count(type).high() == 0) {
return std::make_unique<EmptyIterator>();
}
container_.requireOpen();
binary::Output out;
out.fixed(type);
const auto type_size = out.size();
out.put(start);
return std::make_unique<SeekIterator>(
out.bytes(),
type_size + prefix_size,
type,
this);
}
// Legacy DBs don't store fact ids as 8 byte little-endian numbers so we can't
// seek over those in lexicographic order. Instead, just look up each fact id
// in the DB (much slower).
template<typename Direction>
struct LegacyEnumerateIterator final : rts::FactIterator {
LegacyEnumerateIterator(Id from, Id stop, const DatabaseImpl *db)
: stop_(stop)
, db_(db) {
find(from);
}
void next() override {
find(Direction::advance(id_));
}
Fact::Ref get(Demand) override {
return Direction::more(id_, stop_)
? decomposeFact(id_, slice_)
: Fact::Ref::invalid();
}
void find(Id from) {
for (id_ = from;
Direction::more(id_, stop_) && !db_->lookupById(id_, slice_);
id_ = Direction::advance(id_))
{}
}
Id id_;
Id stop_;
const DatabaseImpl *db_;
rocksdb::PinnableSlice slice_;
};
template<typename Direction>
struct EnumerateIterator final : rts::FactIterator {
static std::vector<char> encode(Id id) {
std::vector<char> v(rts::MAX_NAT_SIZE);
const auto n = rts::storeNat(
reinterpret_cast<unsigned char *>(v.data()),
id.toWord());
v.resize(n);
return v;
}
explicit EnumerateIterator(Id start, Id bound, const DatabaseImpl *db)
: bound_(encode(bound))
, bound_slice_(bound_.data(), bound_.size())
{
// both the slice and options_ need to be alive for the duration
// of the iteration
options_.*Direction::iterate_bound = &bound_slice_;
iter_.reset(
db->container_.db->NewIterator(
options_,
db->container_.family(Family::entities)));
auto st = encode(start);
if (iter_) {
(iter_.get()->*Direction::seek)({st.data(), st.size()});
} else {
rts::error("rocksdb: couldn't allocate iterator");
}
}
void next() override {
(iter_.get()->*Direction::next)();
auto s = iter_->status();
if (!s.IsNotFound()) {
check(s);
}
}
Fact::Ref get(Demand demand) override {
return iter_->Valid()
? decomposeFact(
Id::fromWord(
loadTrustedNat(
reinterpret_cast<const unsigned char *>(
iter_->key().data())).first),
iter_->value())
: Fact::Ref::invalid();
}
const std::vector<char> bound_;
const rocksdb::Slice bound_slice_;
rocksdb::ReadOptions options_;
std::unique_ptr<rocksdb::Iterator> iter_;
};
struct Forward {
static std::pair<Id,Id> bounds(
Id from, Id upto, Id starting_id, Id next_id) {
if (from >= next_id || (upto && upto <= starting_id)) {
return {Id::invalid(), Id::invalid()};
} else {
return {
std::max(from, starting_id),
upto && upto <= next_id ? upto : next_id
};
}
}
static inline constexpr auto iterate_bound =
&rocksdb::ReadOptions::iterate_upper_bound;
static inline constexpr auto seek = &rocksdb::Iterator::Seek;
static inline constexpr auto next = &rocksdb::Iterator::Next;
// legacy iterator support
static Id advance(Id id) { return id + 1; }
static bool more(Id current, Id stop) { return current < stop; }
};
struct Backward {
static std::pair<Id,Id> bounds(
Id from, Id downto, Id starting_id, Id next_id) {
if (downto >= next_id || (from && from <= starting_id)) {
return {Id::invalid(), Id::invalid()};
} else {
return {
(from && from <= next_id ? from : next_id) - 1,
std::max(downto, starting_id)
};
}
}
static inline constexpr auto iterate_bound =
&rocksdb::ReadOptions::iterate_lower_bound;
static inline constexpr auto seek = &rocksdb::Iterator::SeekForPrev;
static inline constexpr auto next = &rocksdb::Iterator::Prev;
// legacy iterator support
static Id advance(Id id) { return id - 1; }
static bool more(Id current, Id stop) { return current >= stop; }
};
template<typename Direction>
std::unique_ptr<rts::FactIterator> makeEnumerateIterator(Id from, Id to) {
container_.requireOpen();
const auto [start, bound] = Direction::bounds(
from, to, startingId(), firstFreeId());
if (!start) {
return std::make_unique<rts::EmptyIterator>();
} else if (db_version <= 2) {
return std::make_unique<LegacyEnumerateIterator<Direction>>(
start, bound, this);
} else {
return std::make_unique<EnumerateIterator<Direction>>(start, bound, this);
}
}
std::unique_ptr<rts::FactIterator> enumerate(Id from, Id upto) override {
return makeEnumerateIterator<Forward>(from, upto);
}
std::unique_ptr<rts::FactIterator> enumerateBack(
Id from, Id downto) override {
return makeEnumerateIterator<Backward>(from, downto);
}
rts::Interval count(Pid pid) const override {
return stats_.count(pid);
}
PredicateStats stats() const override {
return stats_.get();
}
void commit(rts::FactSet& facts) override {
container_.requireOpen();
if (facts.empty()) {
return;
}
if (facts.startingId() < next_id) {
rts::error("batch inserted out of sequence ({} < {})",
facts.startingId(),
next_id);
}
rocksdb::WriteBatch batch;
// NOTE: We do *not* support concurrent writes so we don't need to protect
// stats_ here because nothing should be able to replace it while we're
// running
const auto& old_stats = stats_.unprotected();
PredicateStats new_stats(old_stats);
for (auto iter = facts.enumerate(); auto fact = iter->get(); iter->next()) {
assert(fact.id >= next_id);
uint64_t mem = 0;
auto put = [&](auto family, const auto& key, const auto& value) {
check(batch.Put(
family,
key,
value
));
mem += key.size();
mem += value.size();
};
{
binary::Output k;
if (db_version <= 2) {
k.fixed(fact.id);
} else {
k.nat(fact.id.toWord());
}
binary::Output v;
v.packed(fact.type);
v.packed(fact.clause.key_size);
v.put({fact.clause.data, fact.clause.size()});
put(container_.family(Family::entities), slice(k), slice(v));
}
{
binary::Output k;
k.fixed(fact.type);
k.put(fact.key());
binary::Output v;
v.fixed(fact.id);
put(container_.family(Family::keys), slice(k), slice(v));
}
new_stats[fact.type] += MemoryStats::one(mem);
}
const auto first_free_id = facts.firstFreeId();
check(batch.Put(
container_.family(Family::admin),
toSlice(AdminId::NEXT_ID),
toSlice(first_free_id)));
for (const auto& x : new_stats) {
if (x.second != old_stats.get(x.first)) {
check(batch.Put(
container_.family(Family::stats),
toSlice(x.first.toWord()),
toSlice(x.second)));
}
}
check(container_.db->Write(container_.writeOptions, &batch));
next_id = first_free_id;
stats_.set(std::move(new_stats));
}
void addOwnership(const std::vector<OwnershipSet>& ownership) override {
container_.requireOpen();
if (ownership.empty()) {
return;
}
size_t new_count = 0;
std::vector<size_t> touched;
rocksdb::WriteBatch batch;
for (const auto& set : ownership) {
uint32_t unit_id;
auto res = getUnitId(set.unit);
if (res.hasValue()) {
unit_id = *res;
if (unit_id >= ownership_unit_counters.size()) {
rts::error("inconsistent unit id {}", unit_id);
}
touched.push_back(unit_id);
} else {
unit_id = ownership_unit_counters.size() + new_count;
check(batch.Put(
container_.family(Family::ownershipUnits),
slice(set.unit),
toSlice(unit_id)));
check(batch.Put(
container_.family(Family::ownershipUnitIds),
slice(EncodedNat(unit_id).byteRange()),
slice(set.unit)));
++new_count;
}
binary::Output key;
key.nat(unit_id);
key.nat(unit_id < ownership_unit_counters.size()
? ownership_unit_counters[unit_id]
: 0);
check(batch.Put(
container_.family(Family::ownershipRaw),
slice(key),
rocksdb::Slice(
reinterpret_cast<const char *>(set.ids.data()),
set.ids.size() * sizeof(int64_t))));
}
check(container_.db->Write(container_.writeOptions, &batch));
for (auto i : touched) {
assert(i < ownership_unit_counters.size());
++ownership_unit_counters[i];
}
ownership_unit_counters.insert(
ownership_unit_counters.end(), new_count, 1);
}
std::unique_ptr<rts::DerivedFactOwnershipIterator>
getDerivedFactOwnershipIterator(Pid pid) override {
struct DerivedFactIterator : rts::DerivedFactOwnershipIterator {
explicit DerivedFactIterator(
Pid pid, std::unique_ptr<rocksdb::Iterator> i)
: pid_(pid), iter(std::move(i))
{}
folly::Optional<DerivedFactOwnership> get() override {
if (iter->Valid()) {
binary::Input key(byteRange(iter->key()));
auto pid = key.trustedNat();
if (pid != pid_.toWord()) {
return {};
}
const auto val = iter->value();
const size_t elts = val.size() /
(sizeof(uint32_t) + sizeof(uint64_t));
const Id* ids = reinterpret_cast<const Id *>(val.data());
const UsetId* owners = reinterpret_cast<const UsetId *>(
val.data() + elts * sizeof(uint64_t));
iter->Next();
return rts::DerivedFactOwnership{
{ ids, elts },
{ owners, elts }
};
} else {
return folly::none;
}
}
Pid pid_;
std::unique_ptr<rocksdb::Iterator> iter;
};
std::unique_ptr<rocksdb::Iterator> iter(
container_.db->NewIterator(
rocksdb::ReadOptions(),
container_.family(Family::ownershipDerivedRaw)));
if (!iter) {
rts::error("rocksdb: couldn't allocate derived ownership iterator");
}
EncodedNat key(pid.toWord());
iter->Seek(slice(key.byteRange()));
return std::make_unique<DerivedFactIterator>(pid, std::move(iter));
}
std::unique_ptr<rts::OwnershipUnitIterator>
getOwnershipUnitIterator() override {
struct UnitIterator : rts::OwnershipUnitIterator {
explicit UnitIterator(std::unique_ptr<rocksdb::Iterator> i)
: iter(std::move(i))
{}
folly::Optional<rts::OwnershipUnit> get() override {
if (iter->Valid()) {
binary::Input key(byteRange(iter->key()));
auto unit = key.trustedNat();
const auto val = iter->value();
iter->Next();
return rts::OwnershipUnit{
static_cast<uint32_t>(unit),
{ reinterpret_cast<const OwnershipUnit::Ids *>(val.data()),
val.size() / sizeof(OwnershipUnit::Ids) }
};
} else {
return {};
}
}
std::unique_ptr<rocksdb::Iterator> iter;
};
std::unique_ptr<rocksdb::Iterator> iter(
container_.db->NewIterator(
rocksdb::ReadOptions(),
container_.family(Family::ownershipRaw)));
if (!iter) {
rts::error("rocksdb: couldn't allocate ownership unit iterator");
}
iter->SeekToFirst();
return std::make_unique<UnitIterator>(std::move(iter));
}
void storeOwnership(ComputedOwnership &ownership) override;
std::unique_ptr<rts::Ownership> getOwnership() override;
std::unique_ptr<rts::OwnershipSetIterator> getSetIterator();
void addDefineOwnership(DefineOwnership& define) override;
void putOwnerSet(
rocksdb::WriteBatch& batch,
UsetId id,
SetOp op,
const OwnerSet& set) const {
binary::Output key;
key.nat(id);
binary::Output value;
value.nat(op);
serializeEliasFano(value, set);
check(batch.Put(container_.family(Family::ownershipSets), slice(key),
slice(value)));
}
};
void DatabaseImpl::storeOwnership(ComputedOwnership &ownership) {
container_.requireOpen();
if (ownership.sets_.size() > 0) {
auto t = makeAutoTimer("storeOwnership(sets)");
rocksdb::WriteBatch batch;
uint32_t id = ownership.firstId_;
for (auto &exp : ownership.sets_) {
if ((id % 1000000) == 0) {
VLOG(1) << "storeOwnership: " << id;
}
putOwnerSet(batch, id, exp.op, exp.set);
id++;
}
VLOG(1) << "storeOwnership: writing sets (" <<
ownership.sets_.size() << ")";
check(container_.db->Write(container_.writeOptions, &batch));
}
// ToDo: just update usets_, don't load the whole thing
usets_ = loadOwnershipSets();
if (ownership.facts_.size() > 0) {
auto t = makeAutoTimer("storeOwnership(facts)");
rocksdb::WriteBatch batch;
for (uint64_t i = 0; i < ownership.facts_.size(); i++) {
auto id = ownership.facts_[i].first;
auto usetid = ownership.facts_[i].second;
EncodedNat key(id.toWord());
EncodedNat val(usetid);
check(batch.Put(container_.family(Family::factOwners),
slice(key.byteRange()),
slice(val.byteRange())));
}
VLOG(1) << "storeOwnership: writing facts: " <<
ownership.facts_.size() << " intervals";
check(container_.db->Write(container_.writeOptions, &batch));
}
}
struct StoredOwnership : Ownership {
explicit StoredOwnership(DatabaseImpl *db) : db_(db) {}
UsetId getOwner(Id id) override;
UsetId nextSetId() override {
return db_->usets_->getNextId();
}
UsetId lookupSet(Uset* uset) override {
auto existing = db_->usets_->lookup(uset);
if (existing) {
return existing->id;
} else {
return INVALID_USET;
}
}
folly::Optional<SetExpr<SetU32>> getUset(UsetId id) override {
rocksdb::PinnableSlice val;
auto s = db_->container_.db->Get(
rocksdb::ReadOptions(),
db_->container_.family(Family::ownershipSets),
slice(EncodedNat(id).byteRange()),
&val);
binary::Input inp(byteRange(val));
if (!s.IsNotFound()) {
check(s);
SetExpr<SetU32> exp;
exp.op = static_cast<SetOp>(inp.trustedNat());
OwnerSet efset;
deserializeEliasFano(inp, efset);
exp.set = SetU32::fromEliasFano(efset);
return exp;
} else {
return folly::none;
}
}
std::unique_ptr<rts::OwnershipSetIterator> getSetIterator() override {
return db_->getSetIterator();
}
private:
DatabaseImpl *db_;
};
std::unique_ptr<rts::Ownership> DatabaseImpl::getOwnership() {
container_.requireOpen();
return std::make_unique<StoredOwnership>(this);
}
UsetId StoredOwnership::getOwner(Id id) {
EncodedNat key(id.toWord());
std::unique_ptr<rocksdb::Iterator> iter(db_->container_.db->NewIterator(
rocksdb::ReadOptions(), db_->container_.family(Family::factOwners)));
iter->SeekForPrev(slice(key.byteRange()));
if (!iter->Valid()) {
return INVALID_USET;
}
binary::Input val(byteRange(iter->value()));
return val.trustedNat();
}
void DatabaseImpl::addDefineOwnership(DefineOwnership& def) {
auto t = makeAutoTimer("addDefineOwnership");
container_.requireOpen();
VLOG(1) << "addDefineOwnership: " <<
def.owners_.size() + def.new_owners_.size() << " owners, " <<
def.usets_.size() << " sets";
if (def.newSets_.size() > 0) {
folly::F14FastMap<UsetId,UsetId> substitution;
auto subst = [&](uint32_t old) -> uint32_t {
auto n = substitution.find(old);
if (n == substitution.end()) {
return old;
} else {
return n->second;
}
};
rocksdb::WriteBatch batch;
size_t numNewSets = 0;
for (auto uset : def.newSets_) {
std::set<UsetId> s;
uset->exp.set.foreach([&](uint32_t elt) {
s.insert(subst(elt));
});
SetU32 set = SetU32::from(s);
auto newUset = std::make_unique<Uset>(std::move(set), uset->exp.op, 0);
auto p = newUset.get();
auto oldId = uset->id;
auto q = usets_->add(std::move(newUset));
if (p == q) {
usets_->promote(p);
auto ownerset = p->toEliasFano();
putOwnerSet(batch, p->id, ownerset.op, ownerset.set);
ownerset.set.free();
numNewSets++;
}
VLOG(2) << "rebased set " << oldId << " -> " << q->id;
substitution[oldId] = q->id;
}
VLOG(1) << "addDefineOwnership: writing sets (" << numNewSets << ")";
check(container_.db->Write(container_.writeOptions, &batch));
for (auto& owner : def.owners_) {
owner = subst(owner);
}
}
// ownershipDerivedRaw :: (Pid,nat) -> vector<int64_t>
//
// Similarly to ownershipRaw, this is basically just an
// append-only log. The nat in the key is a per-Pid counter that
// we bump by one each time we add another batch of data for a
// Pid.
binary::Output key;
key.nat(def.pid_.toWord());
const auto [it, _] = ownership_derived_counters.insert(
{def.pid_.toWord(), 0});
key.nat(it->second++);
rocksdb::WriteBatch batch;
binary::Output val;
val.bytes(
def.ids_.data(),
def.ids_.size() *
sizeof(std::remove_reference<decltype(def.ids_)>::type::value_type));
val.bytes(
def.new_ids_.data(),
def.new_ids_.size() *
sizeof(std::remove_reference<decltype(def.new_ids_)>::type::value_type));
val.bytes(
def.owners_.data(),
def.owners_.size() *
sizeof(std::remove_reference<decltype(def.owners_)>::type::value_type));
val.bytes(
def.new_owners_.data(),
def.new_owners_.size() *
sizeof(std::remove_reference<decltype(def.new_owners_)>::type::value_type));
check(batch.Put(
container_.family(Family::ownershipDerivedRaw),
slice(key),
slice(val)));
check(container_.db->Write(container_.writeOptions, &batch));
VLOG(1) << "addDefineOwnership wrote " <<
def.ids_.size() + def.new_ids_.size() <<
" entries for pid " << def.pid_.toWord();
}
std::unique_ptr<rts::OwnershipSetIterator>
DatabaseImpl::getSetIterator() {
struct SetIterator : rts::OwnershipSetIterator {
explicit SetIterator(size_t first, size_t size,
std::unique_ptr<rocksdb::Iterator> i)
: first_(first), size_(size), iter(std::move(i)) {}
folly::Optional<std::pair<UsetId, SetExpr<const OwnerSet *>>> get()
override {
if (iter->Valid()) {
binary::Input key(byteRange(iter->key()));
auto usetid = key.trustedNat();
binary::Input val(byteRange(iter->value()));
iter->Next();
exp.op = static_cast<SetOp>(val.trustedNat());
deserializeEliasFano(val, exp.set);
return std::pair<uint32_t, SetExpr<const OwnerSet *>>(
usetid, { exp.op, &exp.set });
} else {
return folly::none;
}
}
std::pair<size_t,size_t> sizes() const override {
return { first_, size_ };
}
SetExpr<OwnerSet> exp;
size_t first_, size_;
std::unique_ptr<rocksdb::Iterator> iter;
};
std::unique_ptr<rocksdb::Iterator> iter(container_.db->NewIterator(
rocksdb::ReadOptions(),
container_.family(Family::ownershipSets)));
if (!iter) {
rts::error("rocksdb: couldn't allocate ownership set iterator");
}
size_t first, last, size;
iter->SeekToLast();
if (!iter->Valid()) {
last = 0;
} else {
binary::Input key(byteRange(iter->key()));
last = key.trustedNat();
}
iter->SeekToFirst();
if (!iter->Valid()) {
first = 0;
size = 0;
} else {
binary::Input key(byteRange(iter->key()));
first = key.trustedNat();
size = last - first + 1;
}
return std::make_unique<SetIterator>(
first, size, std::move(iter));
}
std::unique_ptr<Database> ContainerImpl::openDatabase(
Id start, int32_t version) && {
return std::make_unique<DatabaseImpl>(std::move(*this), start, version);
}
}
std::unique_ptr<Container> open(
const std::string& path,
Mode mode,
folly::Optional<std::shared_ptr<Cache>> cache) {
return std::make_unique<ContainerImpl>(path, mode, std::move(cache));
}
std::shared_ptr<Cache> newCache(size_t capacity) {
return rocksdb::NewLRUCache(capacity);
}
void restore(const std::string& target, const std::string& source) {
check(
ContainerImpl::backupEngine(source)
->RestoreDBFromLatestBackup(target,target));
}
}
}
}