src/fdb/FDB.cc (231 lines of code) (raw):
#include "FDB.h"
#include <atomic>
#include <folly/CancellationToken.h>
#include <folly/Likely.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/CurrentExecutor.h>
#include <folly/experimental/coro/DetachOnCancel.h>
#include <folly/logging/xlog.h>
#include "foundationdb/fdb_c_types.h"
namespace hf3fs::kv::fdb {
#define TOU8(str) reinterpret_cast<const uint8_t *>(str.data()), str.length()
#define VIEW(name) std::string_view(reinterpret_cast<const char *>(name), name##_length)
#define SELECTOR(selector) TOU8(selector.key), selector.orEqual, selector.offset
template <>
void Result<Int64Result, int64_t>::extractValue() {
error_ = fdb_future_get_int64(future_.get(), &value_);
}
template <>
void Result<KeyResult, String>::extractValue() {
const uint8_t *data;
int data_length;
error_ = fdb_future_get_key(future_.get(), &data, &data_length);
if (error_ == 0) {
value_ = VIEW(data);
}
}
template <>
void Result<ValueResult, std::optional<String>>::extractValue() {
fdb_bool_t present = false;
const uint8_t *data;
int data_length;
error_ = fdb_future_get_value(future_.get(), &present, &data, &data_length);
if (error_ == 0 && present) {
value_ = VIEW(data);
}
}
template <>
void Result<KeyArrayResult, std::vector<String>>::extractValue() {
const FDBKey *keys;
int count = 0;
error_ = fdb_future_get_key_array(future_.get(), &keys, &count);
if (error_ == 0 && count) {
value_.reserve(count);
for (int i = 0; i < count; ++i) {
value_.emplace_back(VIEW(keys[i].key));
}
}
}
template <>
void Result<StringArrayResult, std::vector<String>>::extractValue() {
const char **strings;
int count = 0;
error_ = fdb_future_get_string_array(future_.get(), &strings, &count);
if (error_ == 0 && count) {
value_.reserve(count);
for (int i = 0; i < count; ++i) {
value_.emplace_back(strings[i]);
}
}
}
template <>
void Result<KeyValueArrayResult, std::pair<std::vector<KeyValue>, bool>>::extractValue() {
const FDBKeyValue *kv;
int count;
fdb_bool_t more = false;
error_ = fdb_future_get_keyvalue_array(future_.get(), &kv, &count, &more);
if (error_ == 0 && count) {
auto &vec = value_.first;
vec.reserve(count);
for (int i = 0; i < count; ++i) {
vec.emplace_back(VIEW(kv[i].key), VIEW(kv[i].value));
}
value_.second = more;
}
}
template <>
void Result<KeyRangeArrayResult, std::vector<KeyRange>>::extractValue() {
const FDBKeyRange *ranges;
int count;
error_ = fdb_future_get_keyrange_array(future_.get(), &ranges, &count);
if (error_ == 0 && count) {
value_.reserve(count);
for (int i = 0; i < count; ++i) {
KeyRange range;
range.beginKey = VIEW(ranges[i].begin_key);
range.endKey = VIEW(ranges[i].end_key);
value_.push_back(std::move(range));
}
}
}
template <>
void Result<EmptyResult, EmptyValue>::extractValue() {}
static void coroCallback(FDBFuture *, void *para) {
auto baton = static_cast<folly::coro::Baton *>(para);
baton->post();
}
template <class T, class V>
Task<T> Result<T, V>::toTask(FDBFuture *f) {
T result;
result.future_.reset(f);
folly::coro::Baton baton;
result.error_ = fdb_future_set_callback(f, coroCallback, &baton);
if (result.error()) {
co_return result;
}
std::atomic_bool cancel = false;
auto token = co_await folly::coro::co_current_cancellation_token;
folly::CancellationCallback cb(token, [&]() {
cancel = true;
fdb_future_cancel(f);
});
co_await baton;
if (cancel.load()) {
throw folly::OperationCancelled();
}
result.error_ = fdb_future_get_error(f);
if (result.error()) {
co_return result;
}
result.extractValue();
co_return result;
}
// Global
fdb_error_t DB::selectAPIVersion(int version) { return fdb_select_api_version(version); }
std::string_view DB::errorMsg(fdb_error_t code) { return fdb_get_error(code); }
bool DB::evaluatePredicate(int predicate_test, fdb_error_t code) { return fdb_error_predicate(predicate_test, code); }
// Network
fdb_error_t DB::setNetworkOption(FDBNetworkOption option, std::string_view value /* = {} */) {
return fdb_network_set_option(option, TOU8(value));
}
fdb_error_t DB::setupNetwork() { return fdb_setup_network(); }
fdb_error_t DB::runNetwork() { return fdb_run_network(); }
fdb_error_t DB::stopNetwork() { return fdb_stop_network(); }
// DB
fdb_error_t DB::setOption(FDBDatabaseOption option, std::string_view value) {
return fdb_database_set_option(db_.get(), option, TOU8(value));
}
Task<Int64Result> DB::rebootWorker(std::string_view address, bool check /* = false */, int duration /* = 0 */) {
co_return co_await Int64Result::toTask(fdb_database_reboot_worker(db_.get(), TOU8(address), check, duration));
}
Task<EmptyResult> DB::forceRecoveryWithDataLoss(std::string_view dcid) {
co_return co_await EmptyResult::toTask(fdb_database_force_recovery_with_data_loss(db_.get(), TOU8(dcid)));
}
Task<EmptyResult> DB::createSnapshot(std::string_view uid, std::string_view snapCommand) {
co_return co_await EmptyResult::toTask(fdb_database_create_snapshot(db_.get(), TOU8(uid), TOU8(snapCommand)));
}
Task<KeyResult> DB::purgeBlobGranules(const KeyRangeView &range, int64_t purgeVersion, fdb_bool_t force) {
co_return co_await KeyResult::toTask(
fdb_database_purge_blob_granules(db_.get(), TOU8(range.beginKey), TOU8(range.endKey), purgeVersion, force));
}
Task<EmptyResult> DB::waitPurgeGranulesComplete(std::string_view purgeKey) {
co_return co_await EmptyResult::toTask(fdb_database_wait_purge_granules_complete(db_.get(), TOU8(purgeKey)));
}
void Transaction::reset() { fdb_transaction_reset(tr_.get()); }
void Transaction::cancel() { fdb_transaction_cancel(tr_.get()); }
[[nodiscard]] fdb_error_t Transaction::setOption(FDBTransactionOption option, std::string_view value /* = {} */) {
return fdb_transaction_set_option(tr_.get(), option, TOU8(value));
}
void Transaction::setReadVersion(int64_t version) { fdb_transaction_set_read_version(tr_.get(), version); }
Task<Int64Result> Transaction::getReadVersion() {
co_return co_await Int64Result::toTask(fdb_transaction_get_read_version(tr_.get()));
}
Task<Int64Result> Transaction::getApproximateSize() {
co_return co_await Int64Result::toTask(fdb_transaction_get_approximate_size(tr_.get()));
}
Task<KeyResult> Transaction::getVersionstamp() {
co_return co_await KeyResult::toTask(fdb_transaction_get_versionstamp(tr_.get()));
}
Task<ValueResult> Transaction::get(std::string_view key, fdb_bool_t snapshot /* = false */) {
co_return co_await ValueResult::toTask(fdb_transaction_get(tr_.get(), TOU8(key), snapshot));
}
Task<KeyResult> Transaction::getKey(const KeySelector &selector, fdb_bool_t snapshot /* = false */) {
co_return co_await KeyResult::toTask(fdb_transaction_get_key(tr_.get(), SELECTOR(selector), snapshot));
}
Task<StringArrayResult> Transaction::getAddressesForKey(std::string_view key) {
co_return co_await StringArrayResult::toTask(fdb_transaction_get_addresses_for_key(tr_.get(), TOU8(key)));
}
Task<KeyValueArrayResult> Transaction::getRange(const KeySelector &begin,
const KeySelector &end,
GetRangeLimits limits /* = GetRangeLimits() */,
int iteration /* = 0 */,
bool snapshot /* = false */,
bool reverse /* = false */,
FDBStreamingMode streamingMode /* = FDB_STREAMING_MODE_SERIAL */) {
co_return co_await KeyValueArrayResult::toTask(fdb_transaction_get_range(tr_.get(),
SELECTOR(begin),
SELECTOR(end),
limits.rows.value_or(-1),
limits.bytes.value_or(-1),
streamingMode,
iteration,
snapshot,
reverse));
}
Task<Int64Result> Transaction::getEstimatedRangeSizeBytes(const KeyRangeView &range) {
co_return co_await Int64Result::toTask(
fdb_transaction_get_estimated_range_size_bytes(tr_.get(), TOU8(range.beginKey), TOU8(range.endKey)));
}
Task<KeyArrayResult> Transaction::getRangeSplitPoints(const KeyRangeView &range, int64_t chunkSize) {
co_return co_await KeyArrayResult::toTask(
fdb_transaction_get_range_split_points(tr_.get(), TOU8(range.beginKey), TOU8(range.endKey), chunkSize));
}
Task<EmptyResult> Transaction::watch(std::string_view key) {
co_return co_await EmptyResult::toTask(fdb_transaction_watch(tr_.get(), TOU8(key)));
}
Task<EmptyResult> Transaction::commit() {
if (UNLIKELY(readonly_)) {
// Prevent tools like admin_cli or fsck from mistakenly modifying data
XLOGF(CRITICAL, "disallow call commit on a read-only FDBContext!!!");
EmptyResult result;
result.error_ = 1000; /* operation failed */
co_return result;
}
co_return co_await EmptyResult::toTask(fdb_transaction_commit(tr_.get()));
}
Task<EmptyResult> Transaction::onError(fdb_error_t err) {
co_return co_await EmptyResult::toTask(fdb_transaction_on_error(tr_.get(), err));
}
void Transaction::clear(std::string_view key) { return fdb_transaction_clear(tr_.get(), TOU8(key)); }
void Transaction::clearRange(const KeyRangeView &range) {
fdb_transaction_clear_range(tr_.get(), TOU8(range.beginKey), TOU8(range.endKey));
}
void Transaction::set(std::string_view key, std::string_view value) {
fdb_transaction_set(tr_.get(), TOU8(key), TOU8(value));
}
void Transaction::atomicOp(std::string_view key, std::string_view param, FDBMutationType operationType) {
return fdb_transaction_atomic_op(tr_.get(), TOU8(key), TOU8(param), operationType);
}
[[nodiscard]] fdb_error_t Transaction::getCommittedVersion(int64_t *outVersion) {
return fdb_transaction_get_committed_version(tr_.get(), outVersion);
}
fdb_error_t Transaction::addConflictRange(const KeyRangeView &range, FDBConflictRangeType type) {
return fdb_transaction_add_conflict_range(tr_.get(), TOU8(range.beginKey), TOU8(range.endKey), type);
}
} // namespace hf3fs::kv::fdb