ACTOR Future managementApiCorrectnessActor()

in fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp [627:1480]


	ACTOR Future<Void> managementApiCorrectnessActor(Database cx_, SpecialKeySpaceCorrectnessWorkload* self) {
		// All management api related tests
		state Database cx = cx_->clone();
		state Reference<ReadYourWritesTransaction> tx = makeReference<ReadYourWritesTransaction>(cx);
		// test ordered option keys
		{
			tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
			for (const std::string& option : SpecialKeySpace::getManagementApiOptionsSet()) {
				tx->set(
				    "options/"_sr.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
				        .withSuffix(option),
				    ValueRef());
			}
			RangeResult result = wait(tx->getRange(
			    KeyRangeRef(LiteralStringRef("options/"), LiteralStringRef("options0"))
			        .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin),
			    CLIENT_KNOBS->TOO_MANY));
			ASSERT(!result.more && result.size() < CLIENT_KNOBS->TOO_MANY);
			ASSERT(result.size() == SpecialKeySpace::getManagementApiOptionsSet().size());
			ASSERT(self->getRangeResultInOrder(result));
			tx->reset();
		}
		// "exclude" error message shema check
		try {
			tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
			tx->set(LiteralStringRef("Invalid_Network_Address")
			            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("exclude")),
			        ValueRef());
			wait(tx->commit());
			ASSERT(false);
		} catch (Error& e) {
			if (e.code() == error_code_actor_cancelled)
				throw;
			if (e.code() == error_code_special_keys_api_failure) {
				Optional<Value> errorMsg =
				    wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
				ASSERT(errorMsg.present());
				std::string errorStr;
				auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
				auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
				// special_key_space_management_api_error_msg schema validation
				ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
				ASSERT(valueObj["command"].get_str() == "exclude" && !valueObj["retriable"].get_bool());
			} else {
				TraceEvent(SevDebug, "UnexpectedError").detail("Command", "Exclude").error(e);
				wait(tx->onError(e));
			}
			tx->reset();
		}
		// "setclass"
		{
			try {
				tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
				// test getRange
				state RangeResult result = wait(tx->getRange(
				    KeyRangeRef(LiteralStringRef("process/class_type/"), LiteralStringRef("process/class_type0"))
				        .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin),
				    CLIENT_KNOBS->TOO_MANY));
				ASSERT(!result.more && result.size() < CLIENT_KNOBS->TOO_MANY);
				ASSERT(self->getRangeResultInOrder(result));
				// check correctness of classType of each process
				std::vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
				if (workers.size()) {
					for (const auto& worker : workers) {
						Key addr =
						    Key("process/class_type/" + formatIpPort(worker.address.ip, worker.address.port))
						        .withPrefix(
						            SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin);
						bool found = false;
						for (const auto& kv : result) {
							if (kv.key == addr) {
								ASSERT(kv.value.toString() == worker.processClass.toString());
								found = true;
								break;
							}
						}
						// Each process should find its corresponding element
						ASSERT(found);
					}
					state ProcessData worker = deterministicRandom()->randomChoice(workers);
					state Key addr =
					    Key("process/class_type/" + formatIpPort(worker.address.ip, worker.address.port))
					        .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin);
					tx->set(addr, LiteralStringRef("InvalidProcessType"));
					// test ryw
					Optional<Value> processType = wait(tx->get(addr));
					ASSERT(processType.present() && processType.get() == LiteralStringRef("InvalidProcessType"));
					// test ryw disabled
					tx->setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
					Optional<Value> originalProcessType = wait(tx->get(addr));
					ASSERT(originalProcessType.present() &&
					       originalProcessType.get() == worker.processClass.toString());
					// test error handling (invalid value type)
					wait(tx->commit());
					ASSERT(false);
				} else {
					// If no worker process returned, skip the test
					TraceEvent(SevDebug, "EmptyWorkerListInSetClassTest").log();
				}
			} catch (Error& e) {
				if (e.code() == error_code_actor_cancelled)
					throw;
				if (e.code() == error_code_special_keys_api_failure) {
					Optional<Value> errorMsg =
					    wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
					ASSERT(errorMsg.present());
					std::string errorStr;
					auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
					auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
					// special_key_space_management_api_error_msg schema validation
					ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
					ASSERT(valueObj["command"].get_str() == "setclass" && !valueObj["retriable"].get_bool());
				} else {
					TraceEvent(SevDebug, "UnexpectedError").detail("Command", "Setclass").error(e);
					wait(tx->onError(e));
				}
				tx->reset();
			}
		}
		// read class_source
		{
			try {
				// test getRange
				state RangeResult class_source_result = wait(tx->getRange(
				    KeyRangeRef(LiteralStringRef("process/class_source/"), LiteralStringRef("process/class_source0"))
				        .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin),
				    CLIENT_KNOBS->TOO_MANY));
				ASSERT(!class_source_result.more && class_source_result.size() < CLIENT_KNOBS->TOO_MANY);
				ASSERT(self->getRangeResultInOrder(class_source_result));
				// check correctness of classType of each process
				std::vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
				if (workers.size()) {
					for (const auto& worker : workers) {
						Key addr =
						    Key("process/class_source/" + formatIpPort(worker.address.ip, worker.address.port))
						        .withPrefix(
						            SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin);
						bool found = false;
						for (const auto& kv : class_source_result) {
							if (kv.key == addr) {
								ASSERT(kv.value.toString() == worker.processClass.sourceString());
								// Default source string is command_line
								ASSERT(kv.value == LiteralStringRef("command_line"));
								found = true;
								break;
							}
						}
						// Each process should find its corresponding element
						ASSERT(found);
					}
					ProcessData worker = deterministicRandom()->randomChoice(workers);
					state std::string address = formatIpPort(worker.address.ip, worker.address.port);
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					tx->set(
					    Key("process/class_type/" + address)
					        .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin),
					    Value(worker.processClass.toString())); // Set it as the same class type as before, thus only
					                                            // class source will be changed
					wait(tx->commit());
					tx->reset();
					Optional<Value> class_source = wait(tx->get(
					    Key("process/class_source/" + address)
					        .withPrefix(
					            SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
					TraceEvent(SevDebug, "SetClassSourceDebug")
					    .detail("Present", class_source.present())
					    .detail("ClassSource", class_source.present() ? class_source.get().toString() : "__Nothing");
					// Very rarely, we get an empty worker list, thus no class_source data
					if (class_source.present())
						ASSERT(class_source.get() == LiteralStringRef("set_class"));
					tx->reset();
				} else {
					// If no worker process returned, skip the test
					TraceEvent(SevDebug, "EmptyWorkerListInSetClassTest").log();
				}
			} catch (Error& e) {
				wait(tx->onError(e));
			}
		}
		// test lock and unlock
		// maske sure we lock the database
		loop {
			try {
				tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
				// lock the database
				UID uid = deterministicRandom()->randomUniqueID();
				tx->set(SpecialKeySpace::getManagementApiCommandPrefix("lock"), uid.toString());
				// commit
				wait(tx->commit());
				break;
			} catch (Error& e) {
				TraceEvent(SevDebug, "DatabaseLockFailure").error(e);
				// In case commit_unknown_result is thrown by buggify, we may try to lock more than once
				// The second lock commit will throw special_keys_api_failure error
				if (e.code() == error_code_special_keys_api_failure) {
					Optional<Value> errorMsg =
					    wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
					ASSERT(errorMsg.present());
					std::string errorStr;
					auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
					auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
					// special_key_space_management_api_error_msg schema validation
					ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
					ASSERT(valueObj["command"].get_str() == "lock" && !valueObj["retriable"].get_bool());
					break;
				} else if (e.code() == error_code_database_locked) {
					// Database is already locked. This can happen if a previous attempt
					// failed with unknown_result.
					break;
				} else {
					wait(tx->onError(e));
				}
			}
		}
		TraceEvent(SevDebug, "DatabaseLocked").log();
		// if database locked, fdb read should get database_locked error
		try {
			tx->reset();
			RangeResult res = wait(tx->getRange(normalKeys, 1));
		} catch (Error& e) {
			if (e.code() == error_code_actor_cancelled)
				throw;
			ASSERT(e.code() == error_code_database_locked);
		}
		// make sure we unlock the database
		// unlock is idempotent, thus we can commit many times until successful
		loop {
			try {
				tx->reset();
				tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
				// unlock the database
				tx->clear(SpecialKeySpace::getManagementApiCommandPrefix("lock"));
				wait(tx->commit());
				TraceEvent(SevDebug, "DatabaseUnlocked").log();
				tx->reset();
				// read should be successful
				RangeResult res = wait(tx->getRange(normalKeys, 1));
				tx->reset();
				break;
			} catch (Error& e) {
				TraceEvent(SevDebug, "DatabaseUnlockFailure").error(e);
				ASSERT(e.code() != error_code_database_locked);
				wait(tx->onError(e));
			}
		}
		// test consistencycheck which only used by ConsistencyCheck Workload
		// Note: we have exclusive ownership of fdbShouldConsistencyCheckBeSuspended,
		// no existing workloads can modify the key
		{
			try {
				tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
				Optional<Value> val1 = wait(tx->get(fdbShouldConsistencyCheckBeSuspended));
				state bool ccSuspendSetting =
				    val1.present() ? BinaryReader::fromStringRef<bool>(val1.get(), Unversioned()) : false;
				Optional<Value> val2 =
				    wait(tx->get(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck")));
				// Make sure the read result from special key consistency with the system key
				ASSERT(ccSuspendSetting ? val2.present() : !val2.present());
				tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
				// Make sure by default, consistencycheck is enabled
				ASSERT(!ccSuspendSetting);
				// Disable consistencycheck
				tx->set(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck"), ValueRef());
				wait(tx->commit());
				tx->reset();
				// Read system key to make sure it is disabled
				tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
				Optional<Value> val3 = wait(tx->get(fdbShouldConsistencyCheckBeSuspended));
				bool ccSuspendSetting2 =
				    val3.present() ? BinaryReader::fromStringRef<bool>(val3.get(), Unversioned()) : false;
				ASSERT(ccSuspendSetting2);
				tx->reset();
			} catch (Error& e) {
				wait(tx->onError(e));
			}
		}
		// make sure we enable consistencycheck by the end
		{
			loop {
				try {
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					tx->clear(SpecialKeySpace::getManagementApiCommandPrefix("consistencycheck"));
					wait(tx->commit());
					tx->reset();
					break;
				} catch (Error& e) {
					wait(tx->onError(e));
				}
			}
		}
		// coordinators
		// test read, makes sure it's the same as reading from coordinatorsKey
		loop {
			try {
				tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
				Optional<Value> res = wait(tx->get(coordinatorsKey));
				ASSERT(res.present()); // Otherwise, database is in a bad state
				state ClusterConnectionString cs(res.get().toString());
				Optional<Value> coordinator_processes_key =
				    wait(tx->get(LiteralStringRef("processes")
				                     .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))));
				ASSERT(coordinator_processes_key.present());
				std::vector<std::string> process_addresses;
				boost::split(
				    process_addresses, coordinator_processes_key.get().toString(), [](char c) { return c == ','; });
				ASSERT(process_addresses.size() == cs.coordinators().size());
				// compare the coordinator process network addresses one by one
				for (const auto& network_address : cs.coordinators()) {
					ASSERT(std::find(process_addresses.begin(), process_addresses.end(), network_address.toString()) !=
					       process_addresses.end());
				}
				tx->reset();
				break;
			} catch (Error& e) {
				wait(tx->onError(e));
			}
		}
		// test change coordinators and cluster description
		// we randomly pick one process(not coordinator) and add it, in this case, it should always succeed
		{
			state std::string new_cluster_description;
			state std::string new_coordinator_process;
			state std::vector<std::string> old_coordinators_processes;
			state bool possible_to_add_coordinator;
			state KeyRange coordinators_key_range =
			    KeyRangeRef(LiteralStringRef("process/"), LiteralStringRef("process0"))
			        .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"));
			loop {
				try {
					tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
					Optional<Value> ccStrValue = wait(tx->get(coordinatorsKey));
					ASSERT(ccStrValue.present()); // Otherwise, database is in a bad state
					ClusterConnectionString ccStr(ccStrValue.get().toString());
					// choose a new description if configuration allows transactions across differently named clusters
					new_cluster_description = SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT
					                              ? deterministicRandom()->randomAlphaNumeric(8)
					                              : ccStr.clusterKeyName().toString();
					// get current coordinators
					Optional<Value> processes_key =
					    wait(tx->get(LiteralStringRef("processes")
					                     .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))));
					ASSERT(processes_key.present());
					boost::split(
					    old_coordinators_processes, processes_key.get().toString(), [](char c) { return c == ','; });
					// pick up one non-coordinator process if possible
					std::vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
					TraceEvent(SevDebug, "CoordinatorsManualChange")
					    .detail("OldCoordinators", describe(old_coordinators_processes))
					    .detail("WorkerSize", workers.size());
					if (workers.size() > old_coordinators_processes.size()) {
						loop {
							auto worker = deterministicRandom()->randomChoice(workers);
							new_coordinator_process = worker.address.toString();
							if (std::find(old_coordinators_processes.begin(),
							              old_coordinators_processes.end(),
							              worker.address.toString()) == old_coordinators_processes.end()) {
								break;
							}
						}
						possible_to_add_coordinator = true;
					} else {
						possible_to_add_coordinator = false;
					}
					tx->reset();
					break;
				} catch (Error& e) {
					wait(tx->onError(e));
					wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
				}
			}
			TraceEvent(SevDebug, "CoordinatorsManualChange")
			    .detail("NewCoordinator", possible_to_add_coordinator ? new_coordinator_process : "")
			    .detail("NewClusterDescription", new_cluster_description);
			if (possible_to_add_coordinator) {
				loop {
					try {
						std::string new_processes_key(new_coordinator_process);
						tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
						for (const auto& address : old_coordinators_processes) {
							new_processes_key += "," + address;
						}
						tx->set(LiteralStringRef("processes")
						            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
						        Value(new_processes_key));
						// update cluster description
						tx->set(LiteralStringRef("cluster_description")
						            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
						        Value(new_cluster_description));
						wait(tx->commit());
						ASSERT(false);
					} catch (Error& e) {
						TraceEvent(SevDebug, "CoordinatorsManualChange").error(e);
						// if we repeat doing the change, we will get the error:
						// CoordinatorsResult::SAME_NETWORK_ADDRESSES
						if (e.code() == error_code_special_keys_api_failure) {
							Optional<Value> errorMsg =
							    wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
							ASSERT(errorMsg.present());
							std::string errorStr;
							auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
							auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
							// special_key_space_management_api_error_msg schema validation
							ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
							TraceEvent(SevDebug, "CoordinatorsManualChange")
							    .detail("ErrorMessage", valueObj["message"].get_str());
							ASSERT(valueObj["command"].get_str() == "coordinators");
							if (valueObj["retriable"].get_bool()) { // coordinators not reachable, retry
								tx->reset();
							} else {
								ASSERT(valueObj["message"].get_str() ==
								       "No change (existing configuration satisfies request)");
								tx->reset();
								break;
							}
						} else {
							wait(tx->onError(e));
						}
						wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
					}
				}
				// change successful, now check it is already changed
				try {
					tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
					Optional<Value> res = wait(tx->get(coordinatorsKey));
					ASSERT(res.present()); // Otherwise, database is in a bad state
					ClusterConnectionString cs(res.get().toString());
					ASSERT(cs.coordinators().size() == old_coordinators_processes.size() + 1);
					// verify the coordinators' addresses
					for (const auto& network_address : cs.coordinators()) {
						std::string address_str = network_address.toString();
						ASSERT(std::find(old_coordinators_processes.begin(),
						                 old_coordinators_processes.end(),
						                 address_str) != old_coordinators_processes.end() ||
						       new_coordinator_process == address_str);
					}
					// verify the cluster decription
					ASSERT(new_cluster_description == cs.clusterKeyName().toString());
					tx->reset();
				} catch (Error& e) {
					wait(tx->onError(e));
					wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
				}
				// change back to original settings
				loop {
					try {
						std::string new_processes_key;
						tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
						for (const auto& address : old_coordinators_processes) {
							new_processes_key += new_processes_key.size() ? "," : "";
							new_processes_key += address;
						}
						tx->set(LiteralStringRef("processes")
						            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
						        Value(new_processes_key));
						wait(tx->commit());
						ASSERT(false);
					} catch (Error& e) {
						TraceEvent(SevDebug, "CoordinatorsManualChangeRevert").error(e);
						if (e.code() == error_code_special_keys_api_failure) {
							Optional<Value> errorMsg =
							    wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
							ASSERT(errorMsg.present());
							std::string errorStr;
							auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
							auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
							// special_key_space_management_api_error_msg schema validation
							ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
							TraceEvent(SevDebug, "CoordinatorsManualChangeRevert")
							    .detail("ErrorMessage", valueObj["message"].get_str());
							ASSERT(valueObj["command"].get_str() == "coordinators");
							if (valueObj["retriable"].get_bool()) {
								tx->reset();
							} else if (valueObj["message"].get_str() ==
							           "No change (existing configuration satisfies request)") {
								tx->reset();
								break;
							} else {
								TraceEvent(SevError, "CoordinatorsManualChangeRevert")
								    .detail("UnexpectedError", valueObj["message"].get_str());
								throw special_keys_api_failure();
							}
						} else {
							wait(tx->onError(e));
						}
						wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
					}
				}
			}
		}
		// advanceversion
		try {
			Version v1 = wait(tx->getReadVersion());
			TraceEvent(SevDebug, "InitialReadVersion").detail("Version", v1);
			state Version v2 = 2 * v1;
			loop {
				try {
					// loop until the grv is larger than the set version
					Version v3 = wait(tx->getReadVersion());
					if (v3 > v2) {
						TraceEvent(SevDebug, "AdvanceVersionSuccess").detail("Version", v3);
						break;
					}
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					// force the cluster to recover at v2
					tx->set(SpecialKeySpace::getManagementApiCommandPrefix("advanceversion"), std::to_string(v2));
					wait(tx->commit());
					ASSERT(false); // Should fail with commit_unknown_result
				} catch (Error& e) {
					TraceEvent(SevDebug, "AdvanceVersionCommitFailure").error(e);
					wait(tx->onError(e));
				}
			}
			tx->reset();
		} catch (Error& e) {
			wait(tx->onError(e));
		}
		// profile client get
		loop {
			try {
				tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
				// client_txn_sample_rate
				state Optional<Value> txnSampleRate =
				    wait(tx->get(LiteralStringRef("client_txn_sample_rate")
				                     .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
				ASSERT(txnSampleRate.present());
				Optional<Value> txnSampleRateKey = wait(tx->get(fdbClientInfoTxnSampleRate));
				if (txnSampleRateKey.present()) {
					const double sampleRateDbl =
					    BinaryReader::fromStringRef<double>(txnSampleRateKey.get(), Unversioned());
					if (!std::isinf(sampleRateDbl)) {
						ASSERT(txnSampleRate.get().toString() == boost::lexical_cast<std::string>(sampleRateDbl));
					} else {
						ASSERT(txnSampleRate.get().toString() == "default");
					}
				} else {
					ASSERT(txnSampleRate.get().toString() == "default");
				}
				// client_txn_size_limit
				state Optional<Value> txnSizeLimit =
				    wait(tx->get(LiteralStringRef("client_txn_size_limit")
				                     .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile"))));
				ASSERT(txnSizeLimit.present());
				Optional<Value> txnSizeLimitKey = wait(tx->get(fdbClientInfoTxnSizeLimit));
				if (txnSizeLimitKey.present()) {
					const int64_t sizeLimit =
					    BinaryReader::fromStringRef<int64_t>(txnSizeLimitKey.get(), Unversioned());
					if (sizeLimit != -1) {
						ASSERT(txnSizeLimit.get().toString() == boost::lexical_cast<std::string>(sizeLimit));
					} else {
						ASSERT(txnSizeLimit.get().toString() == "default");
					}
				} else {
					ASSERT(txnSizeLimit.get().toString() == "default");
				}
				tx->reset();
				break;
			} catch (Error& e) {
				TraceEvent(SevDebug, "ProfileClientGet").error(e);
				wait(tx->onError(e));
			}
		}
		{
			state double r_sample_rate = deterministicRandom()->random01();
			state int64_t r_size_limit = deterministicRandom()->randomInt64(1e3, 1e6);
			// update the sample rate and size limit
			loop {
				try {
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					tx->set(LiteralStringRef("client_txn_sample_rate")
					            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
					        Value(boost::lexical_cast<std::string>(r_sample_rate)));
					tx->set(LiteralStringRef("client_txn_size_limit")
					            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
					        Value(boost::lexical_cast<std::string>(r_size_limit)));
					wait(tx->commit());
					tx->reset();
					break;
				} catch (Error& e) {
					wait(tx->onError(e));
				}
			}
			// commit successfully, verify the system key changed
			loop {
				try {
					tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
					Optional<Value> sampleRate = wait(tx->get(fdbClientInfoTxnSampleRate));
					ASSERT(sampleRate.present());
					ASSERT(r_sample_rate == BinaryReader::fromStringRef<double>(sampleRate.get(), Unversioned()));
					Optional<Value> sizeLimit = wait(tx->get(fdbClientInfoTxnSizeLimit));
					ASSERT(sizeLimit.present());
					ASSERT(r_size_limit == BinaryReader::fromStringRef<int64_t>(sizeLimit.get(), Unversioned()));
					tx->reset();
					break;
				} catch (Error& e) {
					wait(tx->onError(e));
				}
			}
			// Change back to default
			loop {
				try {
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					tx->set(LiteralStringRef("client_txn_sample_rate")
					            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
					        LiteralStringRef("default"));
					tx->set(LiteralStringRef("client_txn_size_limit")
					            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
					        LiteralStringRef("default"));
					wait(tx->commit());
					tx->reset();
					break;
				} catch (Error& e) {
					wait(tx->onError(e));
				}
			}
			// Test invalid values
			loop {
				try {
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					tx->set((deterministicRandom()->coinflip() ? LiteralStringRef("client_txn_sample_rate")
					                                           : LiteralStringRef("client_txn_size_limit"))
					            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("profile")),
					        LiteralStringRef("invalid_value"));
					wait(tx->commit());
					ASSERT(false);
				} catch (Error& e) {
					if (e.code() == error_code_special_keys_api_failure) {
						Optional<Value> errorMsg =
						    wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
						ASSERT(errorMsg.present());
						std::string errorStr;
						auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
						auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
						// special_key_space_management_api_error_msg schema validation
						ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
						ASSERT(valueObj["command"].get_str() == "profile" && !valueObj["retriable"].get_bool());
						tx->reset();
						break;
					} else {
						wait(tx->onError(e));
					}
					wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
				}
			}
		}
		// data_distribution & maintenance get
		loop {
			try {
				// maintenance
				RangeResult maintenanceKVs = wait(
				    tx->getRange(SpecialKeySpace::getManamentApiCommandRange("maintenance"), CLIENT_KNOBS->TOO_MANY));
				// By default, no maintenance is going on
				ASSERT(!maintenanceKVs.more && !maintenanceKVs.size());
				// datadistribution
				RangeResult ddKVs = wait(tx->getRange(SpecialKeySpace::getManamentApiCommandRange("datadistribution"),
				                                      CLIENT_KNOBS->TOO_MANY));
				// By default, data_distribution/mode := "-1"
				ASSERT(!ddKVs.more && ddKVs.size() == 1);
				ASSERT(ddKVs[0].key == LiteralStringRef("mode").withPrefix(
				                           SpecialKeySpace::getManagementApiCommandPrefix("datadistribution")));
				ASSERT(ddKVs[0].value == Value(boost::lexical_cast<std::string>(-1)));
				tx->reset();
				break;
			} catch (Error& e) {
				TraceEvent(SevDebug, "MaintenanceGet").error(e);
				wait(tx->onError(e));
			}
		}
		// maintenance set
		{
			// Make sure setting more than one zone as maintenance will fail
			loop {
				try {
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					tx->set(Key(deterministicRandom()->randomAlphaNumeric(8))
					            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
					        Value(boost::lexical_cast<std::string>(deterministicRandom()->randomInt(1, 100))));
					// make sure this is a different zone id
					tx->set(Key(deterministicRandom()->randomAlphaNumeric(9))
					            .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
					        Value(boost::lexical_cast<std::string>(deterministicRandom()->randomInt(1, 100))));
					wait(tx->commit());
					ASSERT(false);
				} catch (Error& e) {
					TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone").error(e);
					if (e.code() == error_code_special_keys_api_failure) {
						Optional<Value> errorMsg =
						    wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
						ASSERT(errorMsg.present());
						std::string errorStr;
						auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
						auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
						// special_key_space_management_api_error_msg schema validation
						ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
						ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool());
						TraceEvent(SevDebug, "MaintenanceSetMoreThanOneZone")
						    .detail("ErrorMessage", valueObj["message"].get_str());
						tx->reset();
						break;
					} else {
						wait(tx->onError(e));
					}
					wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
				}
			}
			// Disable DD for SS failures
			state int ignoreSSFailuresRetry = 0;
			loop {
				try {
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					tx->set(ignoreSSFailuresZoneString.withPrefix(
					            SpecialKeySpace::getManagementApiCommandPrefix("maintenance")),
					        Value(boost::lexical_cast<std::string>(0)));
					wait(tx->commit());
					tx->reset();
					ignoreSSFailuresRetry++;
					wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
				} catch (Error& e) {
					TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures").error(e);
					// the second commit will fail since maintenance not allowed to use while DD disabled for SS
					// failures
					if (e.code() == error_code_special_keys_api_failure) {
						Optional<Value> errorMsg =
						    wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
						ASSERT(errorMsg.present());
						std::string errorStr;
						auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
						auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
						// special_key_space_management_api_error_msg schema validation
						ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
						ASSERT(valueObj["command"].get_str() == "maintenance" && !valueObj["retriable"].get_bool());
						ASSERT(ignoreSSFailuresRetry > 0);
						TraceEvent(SevDebug, "MaintenanceDDIgnoreSSFailures")
						    .detail("Retry", ignoreSSFailuresRetry)
						    .detail("ErrorMessage", valueObj["message"].get_str());
						tx->reset();
						break;
					} else {
						wait(tx->onError(e));
					}
					ignoreSSFailuresRetry++;
					wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
				}
			}
			// set dd mode to 0 and disable DD for rebalance
			loop {
				try {
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
					tx->set(LiteralStringRef("mode").withPrefix(ddPrefix), LiteralStringRef("0"));
					tx->set(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix), Value());
					wait(tx->commit());
					tx->reset();
					break;
				} catch (Error& e) {
					TraceEvent(SevDebug, "DataDistributionDisableModeAndRebalance").error(e);
					wait(tx->onError(e));
				}
			}
			// verify underlying system keys are consistent with the change
			loop {
				try {
					tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
					// check DD disabled for SS failures
					Optional<Value> val1 = wait(tx->get(healthyZoneKey));
					ASSERT(val1.present());
					auto healthyZone = decodeHealthyZoneValue(val1.get());
					ASSERT(healthyZone.first == ignoreSSFailuresZoneString);
					// check DD mode
					Optional<Value> val2 = wait(tx->get(dataDistributionModeKey));
					ASSERT(val2.present());
					// mode should be set to 0
					ASSERT(BinaryReader::fromStringRef<int>(val2.get(), Unversioned()) == 0);
					// check DD disabled for rebalance
					Optional<Value> val3 = wait(tx->get(rebalanceDDIgnoreKey));
					// default value "on"
					ASSERT(val3.present() && val3.get() == LiteralStringRef("on"));
					tx->reset();
					break;
				} catch (Error& e) {
					wait(tx->onError(e));
				}
			}
			// then, clear all changes
			loop {
				try {
					tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					tx->clear(ignoreSSFailuresZoneString.withPrefix(
					    SpecialKeySpace::getManagementApiCommandPrefix("maintenance")));
					KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
					tx->clear(LiteralStringRef("mode").withPrefix(ddPrefix));
					tx->clear(LiteralStringRef("rebalance_ignored").withPrefix(ddPrefix));
					wait(tx->commit());
					tx->reset();
					break;
				} catch (Error& e) {
					wait(tx->onError(e));
				}
			}
			// verify all changes are cleared
			loop {
				try {
					tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
					// check DD SSFailures key
					Optional<Value> val1 = wait(tx->get(healthyZoneKey));
					ASSERT(!val1.present());
					// check DD mode
					Optional<Value> val2 = wait(tx->get(dataDistributionModeKey));
					ASSERT(!val2.present());
					// check DD rebalance key
					Optional<Value> val3 = wait(tx->get(rebalanceDDIgnoreKey));
					ASSERT(!val3.present());
					tx->reset();
					break;
				} catch (Error& e) {
					wait(tx->onError(e));
				}
			}
		}
		// make sure when we change dd related special keys, we grab the two system keys,
		// i.e. moveKeysLockOwnerKey and moveKeysLockWriteKey
		{
			state Reference<ReadYourWritesTransaction> tr1(new ReadYourWritesTransaction(cx));
			state Reference<ReadYourWritesTransaction> tr2(new ReadYourWritesTransaction(cx));
			loop {
				try {
					Version readVersion = wait(tr1->getReadVersion());
					tr2->setVersion(readVersion);
					tr1->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
					tr2->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
					KeyRef ddPrefix = SpecialKeySpace::getManagementApiCommandPrefix("datadistribution");
					tr1->set(LiteralStringRef("mode").withPrefix(ddPrefix), LiteralStringRef("1"));
					wait(tr1->commit());
					// randomly read the moveKeysLockOwnerKey/moveKeysLockWriteKey
					// both of them should be grabbed when changing dd mode
					wait(success(
					    tr2->get(deterministicRandom()->coinflip() ? moveKeysLockOwnerKey : moveKeysLockWriteKey)));
					// tr2 shoulde never succeed, just write to a key to make it not a read-only transaction
					tr2->set(LiteralStringRef("unused_key"), LiteralStringRef(""));
					wait(tr2->commit());
					ASSERT(false); // commit should always fail due to conflict
				} catch (Error& e) {
					if (e.code() != error_code_not_committed) {
						// when buggify is enabled, it's possible we get other retriable errors
						wait(tr2->onError(e));
						tr1->reset();
					} else {
						// loop until we get conflict error
						break;
					}
				}
			}
		}
		return Void();
	}