in fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp [627:1480]
ACTOR Future<Void> managementApiCorrectnessActor(Database cx_, SpecialKeySpaceCorrectnessWorkload* self) {
// All management api related tests
state Database cx = cx_->clone();
state Reference<ReadYourWritesTransaction> tx = makeReference<ReadYourWritesTransaction>(cx);
// test ordered option keys
{
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
for (const std::string& option : SpecialKeySpace::getManagementApiOptionsSet()) {
tx->set(
"options/"_sr.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.withSuffix(option),
ValueRef());
}
RangeResult result = wait(tx->getRange(
KeyRangeRef(LiteralStringRef("options/"), LiteralStringRef("options0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin),
CLIENT_KNOBS->TOO_MANY));
ASSERT(!result.more && result.size() < CLIENT_KNOBS->TOO_MANY);
ASSERT(result.size() == SpecialKeySpace::getManagementApiOptionsSet().size());
ASSERT(self->getRangeResultInOrder(result));
tx->reset();
}
// "exclude" error message shema check
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(LiteralStringRef("Invalid_Network_Address")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("exclude")),
ValueRef());
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "exclude" && !valueObj["retriable"].get_bool());
} else {
TraceEvent(SevDebug, "UnexpectedError").detail("Command", "Exclude").error(e);
wait(tx->onError(e));
}
tx->reset();
}
// "setclass"
{
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// test getRange
state RangeResult result = wait(tx->getRange(
KeyRangeRef(LiteralStringRef("process/class_type/"), LiteralStringRef("process/class_type0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin),
CLIENT_KNOBS->TOO_MANY));
ASSERT(!result.more && result.size() < CLIENT_KNOBS->TOO_MANY);
ASSERT(self->getRangeResultInOrder(result));
// check correctness of classType of each process
std::vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
if (workers.size()) {
for (const auto& worker : workers) {
Key addr =
Key("process/class_type/" + formatIpPort(worker.address.ip, worker.address.port))
.withPrefix(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin);
bool found = false;
for (const auto& kv : result) {
if (kv.key == addr) {
ASSERT(kv.value.toString() == worker.processClass.toString());
found = true;
break;
}
}
// Each process should find its corresponding element
ASSERT(found);
}
state ProcessData worker = deterministicRandom()->randomChoice(workers);
state Key addr =
Key("process/class_type/" + formatIpPort(worker.address.ip, worker.address.port))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin);
tx->set(addr, LiteralStringRef("InvalidProcessType"));
// test ryw
Optional<Value> processType = wait(tx->get(addr));
ASSERT(processType.present() && processType.get() == LiteralStringRef("InvalidProcessType"));
// test ryw disabled
tx->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
Optional<Value> originalProcessType = wait(tx->get(addr));
ASSERT(originalProcessType.present() &&
originalProcessType.get() == worker.processClass.toString());
// test error handling (invalid value type)
wait(tx->commit());
ASSERT(false);
} else {
// If no worker process returned, skip the test
TraceEvent(SevDebug, "EmptyWorkerListInSetClassTest").log();
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "setclass" && !valueObj["retriable"].get_bool());
} else {
TraceEvent(SevDebug, "UnexpectedError").detail("Command", "Setclass").error(e);
wait(tx->onError(e));
}
tx->reset();
}
}
// read class_source
{
try {
// test getRange
state RangeResult class_source_result = wait(tx->getRange(
KeyRangeRef(LiteralStringRef("process/class_source/"), LiteralStringRef("process/class_source0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin),
CLIENT_KNOBS->TOO_MANY));
ASSERT(!class_source_result.more && class_source_result.size() < CLIENT_KNOBS->TOO_MANY);
ASSERT(self->getRangeResultInOrder(class_source_result));
// check correctness of classType of each process
std::vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
if (workers.size()) {
for (const auto& worker : workers) {
Key addr =
Key("process/class_source/" + formatIpPort(worker.address.ip, worker.address.port))
.withPrefix(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin);
bool found = false;
for (const auto& kv : class_source_result) {
if (kv.key == addr) {
ASSERT(kv.value.toString() == worker.processClass.sourceString());
// Default source string is command_line
ASSERT(kv.value == LiteralStringRef("command_line"));
found = true;
break;
}
}
// Each process should find its corresponding element
ASSERT(found);
}
ProcessData worker = deterministicRandom()->randomChoice(workers);
state std::string address = formatIpPort(worker.address.ip, worker.address.port);
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(
Key("process/class_type/" + address)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin),
Value(worker.processClass.toString())); // Set it as the same class type as before, thus only
// class source will be changed
wait(tx->commit());
tx->reset();
Optional<Value> class_source = wait(tx->get(
Key("process/class_source/" + address)
.withPrefix(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
TraceEvent(SevDebug, "SetClassSourceDebug")
.detail("Present", class_source.present())
.detail("ClassSource", class_source.present() ? class_source.get().toString() : "__Nothing");
// Very rarely, we get an empty worker list, thus no class_source data
if (class_source.present())
ASSERT(class_source.get() == LiteralStringRef("set_class"));
tx->reset();
} else {
// If no worker process returned, skip the test
TraceEvent(SevDebug, "EmptyWorkerListInSetClassTest").log();
}
} catch (Error& e) {
wait(tx->onError(e));
}
}
// test lock and unlock
// maske sure we lock the database
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// lock the database
UID uid = deterministicRandom()->randomUniqueID();
tx->set(SpecialKeySpace::getManagementApiCommandPrefix("lock"), uid.toString());
// commit
wait(tx->commit());
break;
} catch (Error& e) {
TraceEvent(SevDebug, "DatabaseLockFailure").error(e);
// In case commit_unknown_result is thrown by buggify, we may try to lock more than once
// The second lock commit will throw special_keys_api_failure error
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "lock" && !valueObj["retriable"].get_bool());
break;
} else if (e.code() == error_code_database_locked) {
// Database is already locked. This can happen if a previous attempt
// failed with unknown_result.
break;
} else {
wait(tx->onError(e));
}
}
}
TraceEvent(SevDebug, "DatabaseLocked").log();
// if database locked, fdb read should get database_locked error
try {
tx->reset();
RangeResult res = wait(tx->getRange(normalKeys, 1));
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
ASSERT(e.code() == error_code_database_locked);
}
// make sure we unlock the database
// unlock is idempotent, thus we can commit many times until successful
loop {
try {
tx->reset();
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// unlock the database
tx->clear(SpecialKeySpace::getManagementApiCommandPrefix("lock"));
wait(tx->commit());
TraceEvent(SevDebug, "DatabaseUnlocked").log();
tx->reset();
// read should be successful
RangeResult res = wait(tx->getRange(normalKeys, 1));
tx->reset();
break;
} catch (Error& e) {
TraceEvent(SevDebug, "DatabaseUnlockFailure").error(e);
ASSERT(e.code() != error_code_database_locked);
wait(tx->onError(e));
}
}
// test consistencycheck which only used by ConsistencyCheck Workload
// Note: we have exclusive ownership of fdbShouldConsistencyCheckBeSuspended,
// no existing workloads can modify the key
{
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> val1 = wait(tx->get(fdbShouldConsistencyCheckBeSuspended));
state bool ccSuspendSetting =
val1.present() ? BinaryReader::fromStringRef<bool>(val1.get(), Unversioned()) : false;
Optional<Value> val2 =
wait(tx->get(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck")));
// Make sure the read result from special key consistency with the system key
ASSERT(ccSuspendSetting ? val2.present() : !val2.present());
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// Make sure by default, consistencycheck is enabled
ASSERT(!ccSuspendSetting);
// Disable consistencycheck
tx->set(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck"), ValueRef());
wait(tx->commit());
tx->reset();
// Read system key to make sure it is disabled
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> val3 = wait(tx->get(fdbShouldConsistencyCheckBeSuspended));
bool ccSuspendSetting2 =
val3.present() ? BinaryReader::fromStringRef<bool>(val3.get(), Unversioned()) : false;
ASSERT(ccSuspendSetting2);
tx->reset();
} catch (Error& e) {
wait(tx->onError(e));
}
}
// make sure we enable consistencycheck by the end
{
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->clear(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck"));
wait(tx->commit());
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
}
// coordinators
// test read, makes sure it's the same as reading from coordinatorsKey
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> res = wait(tx->get(coordinatorsKey));
ASSERT(res.present()); // Otherwise, database is in a bad state
state ClusterConnectionString cs(res.get().toString());
Optional<Value> coordinator_processes_key =
wait(tx->get(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))));
ASSERT(coordinator_processes_key.present());
std::vector<std::string> process_addresses;
boost::split(
process_addresses, coordinator_processes_key.get().toString(), [](char c) { return c == ','; });
ASSERT(process_addresses.size() == cs.coordinators().size());
// compare the coordinator process network addresses one by one
for (const auto& network_address : cs.coordinators()) {
ASSERT(std::find(process_addresses.begin(), process_addresses.end(), network_address.toString()) !=
process_addresses.end());
}
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// test change coordinators and cluster description
// we randomly pick one process(not coordinator) and add it, in this case, it should always succeed
{
state std::string new_cluster_description;
state std::string new_coordinator_process;
state std::vector<std::string> old_coordinators_processes;
state bool possible_to_add_coordinator;
state KeyRange coordinators_key_range =
KeyRangeRef(LiteralStringRef("process/"), LiteralStringRef("process0"))
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"));
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> ccStrValue = wait(tx->get(coordinatorsKey));
ASSERT(ccStrValue.present()); // Otherwise, database is in a bad state
ClusterConnectionString ccStr(ccStrValue.get().toString());
// choose a new description if configuration allows transactions across differently named clusters
new_cluster_description = SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT
? deterministicRandom()->randomAlphaNumeric(8)
: ccStr.clusterKeyName().toString();
// get current coordinators
Optional<Value> processes_key =
wait(tx->get(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))));
ASSERT(processes_key.present());
boost::split(
old_coordinators_processes, processes_key.get().toString(), [](char c) { return c == ','; });
// pick up one non-coordinator process if possible
std::vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
TraceEvent(SevDebug, "CoordinatorsManualChange")
.detail("OldCoordinators", describe(old_coordinators_processes))
.detail("WorkerSize", workers.size());
if (workers.size() > old_coordinators_processes.size()) {
loop {
auto worker = deterministicRandom()->randomChoice(workers);
new_coordinator_process = worker.address.toString();
if (std::find(old_coordinators_processes.begin(),
old_coordinators_processes.end(),
worker.address.toString()) == old_coordinators_processes.end()) {
break;
}
}
possible_to_add_coordinator = true;
} else {
possible_to_add_coordinator = false;
}
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
TraceEvent(SevDebug, "CoordinatorsManualChange")
.detail("NewCoordinator", possible_to_add_coordinator ? new_coordinator_process : "")
.detail("NewClusterDescription", new_cluster_description);
if (possible_to_add_coordinator) {
loop {
try {
std::string new_processes_key(new_coordinator_process);
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
for (const auto& address : old_coordinators_processes) {
new_processes_key += "," + address;
}
tx->set(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
Value(new_processes_key));
// update cluster description
tx->set(LiteralStringRef("cluster_description")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
Value(new_cluster_description));
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
TraceEvent(SevDebug, "CoordinatorsManualChange").error(e);
// if we repeat doing the change, we will get the error:
// CoordinatorsResult::SAME_NETWORK_ADDRESSES
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
TraceEvent(SevDebug, "CoordinatorsManualChange")
.detail("ErrorMessage", valueObj["message"].get_str());
ASSERT(valueObj["command"].get_str() == "coordinators");
if (valueObj["retriable"].get_bool()) { // coordinators not reachable, retry
tx->reset();
} else {
ASSERT(valueObj["message"].get_str() ==
"No change (existing configuration satisfies request)");
tx->reset();
break;
}
} else {
wait(tx->onError(e));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
// change successful, now check it is already changed
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> res = wait(tx->get(coordinatorsKey));
ASSERT(res.present()); // Otherwise, database is in a bad state
ClusterConnectionString cs(res.get().toString());
ASSERT(cs.coordinators().size() == old_coordinators_processes.size() + 1);
// verify the coordinators' addresses
for (const auto& network_address : cs.coordinators()) {
std::string address_str = network_address.toString();
ASSERT(std::find(old_coordinators_processes.begin(),
old_coordinators_processes.end(),
address_str) != old_coordinators_processes.end() ||
new_coordinator_process == address_str);
}
// verify the cluster decription
ASSERT(new_cluster_description == cs.clusterKeyName().toString());
tx->reset();
} catch (Error& e) {
wait(tx->onError(e));
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
// change back to original settings
loop {
try {
std::string new_processes_key;
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
for (const auto& address : old_coordinators_processes) {
new_processes_key += new_processes_key.size() ? "," : "";
new_processes_key += address;
}
tx->set(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
Value(new_processes_key));
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
TraceEvent(SevDebug, "CoordinatorsManualChangeRevert").error(e);
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
TraceEvent(SevDebug, "CoordinatorsManualChangeRevert")
.detail("ErrorMessage", valueObj["message"].get_str());
ASSERT(valueObj["command"].get_str() == "coordinators");
if (valueObj["retriable"].get_bool()) {
tx->reset();
} else if (valueObj["message"].get_str() ==
"No change (existing configuration satisfies request)") {
tx->reset();
break;
} else {
TraceEvent(SevError, "CoordinatorsManualChangeRevert")
.detail("UnexpectedError", valueObj["message"].get_str());
throw special_keys_api_failure();
}
} else {
wait(tx->onError(e));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
}
}
// advanceversion
try {
Version v1 = wait(tx->getReadVersion());
TraceEvent(SevDebug, "InitialReadVersion").detail("Version", v1);
state Version v2 = 2 * v1;
loop {
try {
// loop until the grv is larger than the set version
Version v3 = wait(tx->getReadVersion());
if (v3 > v2) {
TraceEvent(SevDebug, "AdvanceVersionSuccess").detail("Version", v3);
break;
}
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
// force the cluster to recover at v2
tx->set(SpecialKeySpace::getManagementApiCommandPrefix("advanceversion"), std::to_string(v2));
wait(tx->commit());
ASSERT(false); // Should fail with commit_unknown_result
} catch (Error& e) {
TraceEvent(SevDebug, "AdvanceVersionCommitFailure").error(e);
wait(tx->onError(e));
}
}
tx->reset();
} catch (Error& e) {
wait(tx->onError(e));
}
// profile client get
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
// client_txn_sample_rate
state Optional<Value> txnSampleRate =
wait(tx->get(LiteralStringRef("client_txn_sample_rate")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
ASSERT(txnSampleRate.present());
Optional<Value> txnSampleRateKey = wait(tx->get(fdbClientInfoTxnSampleRate));
if (txnSampleRateKey.present()) {
const double sampleRateDbl =
BinaryReader::fromStringRef<double>(txnSampleRateKey.get(), Unversioned());
if (!std::isinf(sampleRateDbl)) {
ASSERT(txnSampleRate.get().toString() == boost::lexical_cast<std::string>(sampleRateDbl));
} else {
ASSERT(txnSampleRate.get().toString() == "default");
}
} else {
ASSERT(txnSampleRate.get().toString() == "default");
}
// client_txn_size_limit
state Optional<Value> txnSizeLimit =
wait(tx->get(LiteralStringRef("client_txn_size_limit")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
ASSERT(txnSizeLimit.present());
Optional<Value> txnSizeLimitKey = wait(tx->get(fdbClientInfoTxnSizeLimit));
if (txnSizeLimitKey.present()) {
const int64_t sizeLimit =
BinaryReader::fromStringRef<int64_t>(txnSizeLimitKey.get(), Unversioned());
if (sizeLimit != -1) {
ASSERT(txnSizeLimit.get().toString() == boost::lexical_cast<std::string>(sizeLimit));
} else {
ASSERT(txnSizeLimit.get().toString() == "default");
}
} else {
ASSERT(txnSizeLimit.get().toString() == "default");
}
tx->reset();
break;
} catch (Error& e) {
TraceEvent(SevDebug, "ProfileClientGet").error(e);
wait(tx->onError(e));
}
}
{
state double r_sample_rate = deterministicRandom()->random01();
state int64_t r_size_limit = deterministicRandom()->randomInt64(1e3, 1e6);
// update the sample rate and size limit
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(LiteralStringRef("client_txn_sample_rate")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
Value(boost::lexical_cast<std::string>(r_sample_rate)));
tx->set(LiteralStringRef("client_txn_size_limit")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
Value(boost::lexical_cast<std::string>(r_size_limit)));
wait(tx->commit());
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// commit successfully, verify the system key changed
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> sampleRate = wait(tx->get(fdbClientInfoTxnSampleRate));
ASSERT(sampleRate.present());
ASSERT(r_sample_rate == BinaryReader::fromStringRef<double>(sampleRate.get(), Unversioned()));
Optional<Value> sizeLimit = wait(tx->get(fdbClientInfoTxnSizeLimit));
ASSERT(sizeLimit.present());
ASSERT(r_size_limit == BinaryReader::fromStringRef<int64_t>(sizeLimit.get(), Unversioned()));
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// Change back to default
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(LiteralStringRef("client_txn_sample_rate")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
LiteralStringRef("default"));
tx->set(LiteralStringRef("client_txn_size_limit")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
LiteralStringRef("default"));
wait(tx->commit());
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// Test invalid values
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set((deterministicRandom()->coinflip() ? LiteralStringRef("client_txn_sample_rate")
: LiteralStringRef("client_txn_size_limit"))
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
LiteralStringRef("invalid_value"));
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "profile" && !valueObj["retriable"].get_bool());
tx->reset();
break;
} else {
wait(tx->onError(e));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
}
// data_distribution & maintenance get
loop {
try {
// maintenance
RangeResult maintenanceKVs = wait(
tx->getRange(SpecialKeySpace::getManamentApiCommandRange("maintenance"), CLIENT_KNOBS->TOO_MANY));
// By default, no maintenance is going on
ASSERT(!maintenanceKVs.more && !maintenanceKVs.size());
// datadistribution
RangeResult ddKVs = wait(tx->getRange(SpecialKeySpace::getManamentApiCommandRange("datadistribution"),
CLIENT_KNOBS->TOO_MANY));
// By default, data_distribution/mode := "-1"
ASSERT(!ddKVs.more && ddKVs.size() == 1);
ASSERT(ddKVs[0].key == LiteralStringRef("mode").withPrefix(
SpecialKeySpace::getManagementApiCommandPrefix("datadistribution")));
ASSERT(ddKVs[0].value == Value(boost::lexical_cast<std::string>(-1)));
tx->reset();
break;
} catch (Error& e) {
TraceEvent(SevDebug, "MaintenanceGet").error(e);
wait(tx->onError(e));
}
}
// maintenance set
{
// Make sure setting more than one zone as maintenance will fail
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(Key(deterministicRandom()->randomAlphaNumeric(8))
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
Value(boost::lexical_cast<std::string>(deterministicRandom()->randomInt(1, 100))));
// make sure this is a different zone id
tx->set(Key(deterministicRandom()->randomAlphaNumeric(9))
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
Value(boost::lexical_cast<std::string>(deterministicRandom()->randomInt(1, 100))));
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone").error(e);
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool());
TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone")
.detail("ErrorMessage", valueObj["message"].get_str());
tx->reset();
break;
} else {
wait(tx->onError(e));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
// Disable DD for SS failures
state int ignoreSSFailuresRetry = 0;
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(ignoreSSFailuresZoneString.withPrefix(
SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
Value(boost::lexical_cast<std::string>(0)));
wait(tx->commit());
tx->reset();
ignoreSSFailuresRetry++;
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
} catch (Error& e) {
TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures").error(e);
// the second commit will fail since maintenance not allowed to use while DD disabled for SS
// failures
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool());
ASSERT(ignoreSSFailuresRetry > 0);
TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures")
.detail("Retry", ignoreSSFailuresRetry)
.detail("ErrorMessage", valueObj["message"].get_str());
tx->reset();
break;
} else {
wait(tx->onError(e));
}
ignoreSSFailuresRetry++;
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
// set dd mode to 0 and disable DD for rebalance
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
tx->set(LiteralStringRef("mode").withPrefix(ddPrefix), LiteralStringRef("0"));
tx->set(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix), Value());
wait(tx->commit());
tx->reset();
break;
} catch (Error& e) {
TraceEvent(SevDebug, "DataDistributionDisableModeAndRebalance").error(e);
wait(tx->onError(e));
}
}
// verify underlying system keys are consistent with the change
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
// check DD disabled for SS failures
Optional<Value> val1 = wait(tx->get(healthyZoneKey));
ASSERT(val1.present());
auto healthyZone = decodeHealthyZoneValue(val1.get());
ASSERT(healthyZone.first == ignoreSSFailuresZoneString);
// check DD mode
Optional<Value> val2 = wait(tx->get(dataDistributionModeKey));
ASSERT(val2.present());
// mode should be set to 0
ASSERT(BinaryReader::fromStringRef<int>(val2.get(), Unversioned()) == 0);
// check DD disabled for rebalance
Optional<Value> val3 = wait(tx->get(rebalanceDDIgnoreKey));
// default value "on"
ASSERT(val3.present() && val3.get() == LiteralStringRef("on"));
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// then, clear all changes
loop {
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->clear(ignoreSSFailuresZoneString.withPrefix(
SpecialKeySpace::getManagementApiCommandPrefix("maintenance")));
KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
tx->clear(LiteralStringRef("mode").withPrefix(ddPrefix));
tx->clear(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix));
wait(tx->commit());
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// verify all changes are cleared
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
// check DD SSFailures key
Optional<Value> val1 = wait(tx->get(healthyZoneKey));
ASSERT(!val1.present());
// check DD mode
Optional<Value> val2 = wait(tx->get(dataDistributionModeKey));
ASSERT(!val2.present());
// check DD rebalance key
Optional<Value> val3 = wait(tx->get(rebalanceDDIgnoreKey));
ASSERT(!val3.present());
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
}
// make sure when we change dd related special keys, we grab the two system keys,
// i.e. moveKeysLockOwnerKey and moveKeysLockWriteKey
{
state Reference<ReadYourWritesTransaction> tr1(new ReadYourWritesTransaction(cx));
state Reference<ReadYourWritesTransaction> tr2(new ReadYourWritesTransaction(cx));
loop {
try {
Version readVersion = wait(tr1->getReadVersion());
tr2->setVersion(readVersion);
tr1->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr2->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
tr1->set(LiteralStringRef("mode").withPrefix(ddPrefix), LiteralStringRef("1"));
wait(tr1->commit());
// randomly read the moveKeysLockOwnerKey/moveKeysLockWriteKey
// both of them should be grabbed when changing dd mode
wait(success(
tr2->get(deterministicRandom()->coinflip() ? moveKeysLockOwnerKey : moveKeysLockWriteKey)));
// tr2 shoulde never succeed, just write to a key to make it not a read-only transaction
tr2->set(LiteralStringRef("unused_key"), LiteralStringRef(""));
wait(tr2->commit());
ASSERT(false); // commit should always fail due to conflict
} catch (Error& e) {
if (e.code() != error_code_not_committed) {
// when buggify is enabled, it's possible we get other retriable errors
wait(tr2->onError(e));
tr1->reset();
} else {
// loop until we get conflict error
break;
}
}
}
}
return Void();
}