in fdbcli/fdbcli.actor.cpp [1556:2342]
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state LineNoise& linenoise = *plinenoise;
state bool intrans = false;
state Database localDb;
state Reference<IDatabase> db;
state Reference<ITransaction> tr;
state Transaction trx;
state bool writeMode = false;
state std::string clusterConnectString;
state std::map<Key, std::pair<Value, ClientLeaderRegInterface>> address_interface;
state FdbOptions globalOptions;
state FdbOptions activeOptions;
state FdbOptions* options = &globalOptions;
state Reference<ClusterConnectionFile> ccf;
state std::pair<std::string, bool> resolvedClusterFile =
ClusterConnectionFile::lookupClusterFileName(opt.clusterFile);
try {
ccf = makeReference<ClusterConnectionFile>(resolvedClusterFile.first);
} catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
return 1;
}
// Ordinarily, this is done when the network is run. However, network thread should be set before TraceEvents are
// logged. This thread will eventually run the network, so call it now.
TraceEvent::setNetworkThread();
try {
localDb = Database::createDatabase(ccf, opt.api_version, IsInternal::False);
if (!opt.exec.present()) {
printf("Using cluster file `%s'.\n", ccf->getLocation().c_str());
}
db = API->createDatabase(opt.clusterFile.c_str());
} catch (Error& e) {
fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
printf("Unable to connect to cluster from `%s'\n", ccf->getLocation().c_str());
return 1;
}
if (opt.trace) {
TraceEvent("CLIProgramStart")
.setMaxEventLength(12000)
.detail("SourceVersion", getSourceVersion())
.detail("Version", FDB_VT_VERSION)
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(nullptr))
.detail("ClusterFile", ccf->toString())
.detail("ConnectionString", ccf->getConnectionString().toString())
.setMaxFieldLength(10000)
.detail("CommandLine", opt.commandLine)
.trackLatest("ProgramStart");
}
// used to catch the first cluster_version_changed error when using external clients
// when using external clients, it will throw cluster_version_changed for the first time establish the connection to
// the cluster. Thus, we catch it by doing a get version request to establish the connection
// The 3.0 timeout is a guard to avoid waiting forever when the cli cannot talk to any coordinators
loop {
try {
getTransaction(db, tr, options, intrans);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(delay(3.0) || success(safeThreadFutureToFuture(tr->getReadVersion())));
break;
} catch (Error& e) {
if (e.code() == error_code_cluster_version_changed) {
wait(safeThreadFutureToFuture(tr->onError(e)));
} else {
// unexpected errors
fprintf(stderr, "ERROR: unexpected error %d while initializing the multiversion database\n", e.code());
tr->reset();
break;
}
}
}
if (!opt.exec.present()) {
if (opt.initialStatusCheck) {
Future<Void> checkStatusF = checkStatus(Void(), db, localDb);
wait(makeInterruptable(success(checkStatusF)));
} else {
printf("\n");
}
printf("Welcome to the fdbcli. For help, type `help'.\n");
validOptions = options->getValidOptions();
}
state bool is_error = false;
state Future<Void> warn;
loop {
if (warn.isValid())
warn.cancel();
state std::string line;
if (opt.exec.present()) {
line = opt.exec.get();
} else {
Optional<std::string> rawline = wait(linenoise.read("fdb> "));
if (!rawline.present()) {
printf("\n");
return 0;
}
line = rawline.get();
if (!line.size())
continue;
// Don't put dangerous commands in the command history
if (line.find("writemode") == std::string::npos && line.find("expensive_data_check") == std::string::npos &&
line.find("unlock") == std::string::npos && line.find("blobrange") == std::string::npos)
linenoise.historyAdd(line);
}
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db, localDb);
try {
state UID randomID = deterministicRandom()->randomUniqueID();
TraceEvent(SevInfo, "CLICommandLog", randomID).detail("Command", line);
bool malformed, partial;
state std::vector<std::vector<StringRef>> parsed = parseLine(line, malformed, partial);
if (malformed)
LogCommand(line, randomID, "ERROR: malformed escape sequence");
if (partial)
LogCommand(line, randomID, "ERROR: unterminated quote");
if (malformed || partial) {
if (parsed.size() > 0) {
// Denote via a special token that the command was a parse failure.
auto& last_command = parsed.back();
last_command.insert(last_command.begin(),
StringRef((const uint8_t*)"parse_error", strlen("parse_error")));
}
}
state bool multi = parsed.size() > 1;
is_error = false;
state std::vector<std::vector<StringRef>>::iterator iter;
for (iter = parsed.begin(); iter != parsed.end(); ++iter) {
state std::vector<StringRef> tokens = *iter;
if (is_error) {
printf("WARNING: the previous command failed, the remaining commands will not be executed.\n");
break;
}
if (!tokens.size())
continue;
if (tokencmp(tokens[0], "parse_error")) {
fprintf(stderr, "ERROR: Command failed to completely parse.\n");
if (tokens.size() > 1) {
fprintf(stderr, "ERROR: Not running partial or malformed command:");
for (auto t = tokens.begin() + 1; t != tokens.end(); ++t)
printf(" %s", formatStringRef(*t, true).c_str());
printf("\n");
}
is_error = true;
continue;
}
if (multi) {
printf(">>>");
for (auto t = tokens.begin(); t != tokens.end(); ++t)
printf(" %s", formatStringRef(*t, true).c_str());
printf("\n");
}
if (!helpMap.count(tokens[0].toString()) && !hiddenCommands.count(tokens[0].toString())) {
fprintf(stderr, "ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
is_error = true;
continue;
}
if (tokencmp(tokens[0], "exit") || tokencmp(tokens[0], "quit")) {
return 0;
}
if (tokencmp(tokens[0], "help")) {
if (tokens.size() == 1) {
printHelpOverview();
} else if (tokens.size() == 2) {
if (tokencmp(tokens[1], "escaping"))
printf("\n"
"When parsing commands, fdbcli considers a space to delimit individual tokens.\n"
"To include a space in a single token, you may either enclose the token in\n"
"quotation marks (\"hello world\"), prefix the space with a backslash\n"
"(hello\\ world), or encode the space as a hex byte (hello\\x20world).\n"
"\n"
"To include a literal quotation mark in a token, precede it with a backslash\n"
"(\\\"hello\\ world\\\").\n"
"\n"
"To express a binary value, encode each byte as a two-digit hex byte, preceded\n"
"by \\x (e.g. \\x20 for a space character, or \\x0a\\x00\\x00\\x00 for a\n"
"32-bit, little-endian representation of the integer 10).\n"
"\n"
"All keys and values are displayed by the fdbcli with non-printable characters\n"
"and spaces encoded as two-digit hex bytes.\n\n");
else if (tokencmp(tokens[1], "options")) {
printf("\n"
"The following options are available to be set using the `option' command:\n"
"\n");
options->printHelpString();
} else if (tokencmp(tokens[1], "help"))
printHelpOverview();
else
printHelp(tokens[1]);
} else
printf("Usage: help [topic]\n");
continue;
}
if (tokencmp(tokens[0], "waitconnected")) {
wait(makeInterruptable(localDb->onConnected()));
continue;
}
if (tokencmp(tokens[0], "waitopen")) {
wait(makeInterruptable(
success(safeThreadFutureToFuture(getTransaction(db, tr, options, intrans)->getReadVersion()))));
continue;
}
if (tokencmp(tokens[0], "sleep")) {
if (tokens.size() != 2) {
printUsage(tokens[0]);
is_error = true;
} else {
double v;
int n = 0;
if (sscanf(tokens[1].toString().c_str(), "%lf%n", &v, &n) != 1 || n != tokens[1].size()) {
printUsage(tokens[0]);
is_error = true;
} else {
wait(delay(v));
}
}
continue;
}
if (tokencmp(tokens[0], "status")) {
// Warn at 7 seconds since status will spend as long as 5 seconds trying to read/write from the
// database
warn = timeWarning(7.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n");
bool _result = wait(makeInterruptable(statusCommandActor(db, localDb, tokens, opt.exec.present())));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "triggerddteaminfolog")) {
wait(success(makeInterruptable(triggerddteaminfologCommandActor(db))));
continue;
}
if (tokencmp(tokens[0], "tssq")) {
bool _result = wait(makeInterruptable(tssqCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "configure")) {
bool _result =
wait(makeInterruptable(configureCommandActor(db, localDb, tokens, &linenoise, warn)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "fileconfigure")) {
if (tokens.size() == 2 || (tokens.size() == 3 && (tokens[1] == LiteralStringRef("new") ||
tokens[1] == LiteralStringRef("FORCE")))) {
bool _result =
wait(makeInterruptable(fileConfigureCommandActor(db,
tokens.back().toString(),
tokens[1] == LiteralStringRef("new"),
tokens[1] == LiteralStringRef("FORCE"))));
if (!_result)
is_error = true;
} else {
printUsage(tokens[0]);
is_error = true;
}
continue;
}
if (tokencmp(tokens[0], "coordinators")) {
bool _result = wait(makeInterruptable(coordinatorsCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "exclude")) {
bool _result = wait(makeInterruptable(excludeCommandActor(db, tokens, warn)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "include")) {
bool _result = wait(makeInterruptable(includeCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "snapshot")) {
bool _result = wait(makeInterruptable(snapshotCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "lock")) {
bool _result = wait(makeInterruptable(lockCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "changefeed")) {
bool _result = wait(makeInterruptable(changeFeedCommandActor(localDb, tokens, warn)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "blobrange")) {
bool _result = wait(makeInterruptable(blobRangeCommandActor(localDb, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "unlock")) {
if ((tokens.size() != 2) || (tokens[1].size() != 32) ||
!std::all_of(tokens[1].begin(), tokens[1].end(), &isxdigit)) {
printUsage(tokens[0]);
is_error = true;
} else {
state std::string passPhrase = deterministicRandom()->randomAlphaNumeric(10);
warn.cancel(); // don't warn while waiting on user input
printf("Unlocking the database is a potentially dangerous operation.\n");
printf("%s\n", passPhrase.c_str());
fflush(stdout);
Optional<std::string> input =
wait(linenoise.read(format("Repeat the above passphrase if you would like to proceed:")));
warn =
checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db, localDb);
if (input.present() && input.get() == passPhrase) {
UID unlockUID = UID::fromString(tokens[1].toString());
bool _result = wait(makeInterruptable(unlockDatabaseActor(db, unlockUID)));
if (!_result)
is_error = true;
} else {
fprintf(stderr, "ERROR: Incorrect passphrase entered.\n");
is_error = true;
}
}
continue;
}
if (tokencmp(tokens[0], "setclass")) {
bool _result = wait(makeInterruptable(setClassCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "begin")) {
if (tokens.size() != 1) {
printUsage(tokens[0]);
is_error = true;
} else if (intrans) {
fprintf(stderr, "ERROR: Already in transaction\n");
is_error = true;
} else {
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
getTransaction(db, tr, options, false);
intrans = true;
printf("Transaction started\n");
}
continue;
}
if (tokencmp(tokens[0], "commit")) {
if (tokens.size() != 1) {
printUsage(tokens[0]);
is_error = true;
} else if (!intrans) {
fprintf(stderr, "ERROR: No active transaction\n");
is_error = true;
} else {
wait(commitTransaction(tr));
intrans = false;
options = &globalOptions;
}
continue;
}
if (tokencmp(tokens[0], "reset")) {
if (tokens.size() != 1) {
printUsage(tokens[0]);
is_error = true;
} else if (!intrans) {
fprintf(stderr, "ERROR: No active transaction\n");
is_error = true;
} else {
tr->reset();
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
options->apply(tr);
printf("Transaction reset\n");
}
continue;
}
if (tokencmp(tokens[0], "rollback")) {
if (tokens.size() != 1) {
printUsage(tokens[0]);
is_error = true;
} else if (!intrans) {
fprintf(stderr, "ERROR: No active transaction\n");
is_error = true;
} else {
intrans = false;
options = &globalOptions;
printf("Transaction rolled back\n");
}
continue;
}
if (tokencmp(tokens[0], "get")) {
if (tokens.size() != 2) {
printUsage(tokens[0]);
is_error = true;
} else {
state ThreadFuture<Optional<Value>> valueF =
getTransaction(db, tr, options, intrans)->get(tokens[1]);
Optional<Standalone<StringRef>> v = wait(makeInterruptable(safeThreadFutureToFuture(valueF)));
if (v.present())
printf("`%s' is `%s'\n", printable(tokens[1]).c_str(), printable(v.get()).c_str());
else
printf("`%s': not found\n", printable(tokens[1]).c_str());
}
continue;
}
if (tokencmp(tokens[0], "getversion")) {
if (tokens.size() != 1) {
printUsage(tokens[0]);
is_error = true;
} else {
Version v = wait(makeInterruptable(
safeThreadFutureToFuture(getTransaction(db, tr, options, intrans)->getReadVersion())));
printf("%ld\n", v);
}
continue;
}
if (tokencmp(tokens[0], "advanceversion")) {
bool _result = wait(makeInterruptable(advanceVersionCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "kill")) {
getTransaction(db, tr, options, intrans);
bool _result = wait(makeInterruptable(killCommandActor(db, tr, tokens, &address_interface)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "suspend")) {
getTransaction(db, tr, options, intrans);
bool _result = wait(makeInterruptable(suspendCommandActor(db, tr, tokens, &address_interface)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "force_recovery_with_data_loss")) {
bool _result = wait(makeInterruptable(forceRecoveryWithDataLossCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "maintenance")) {
bool _result = wait(makeInterruptable(maintenanceCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "consistencycheck")) {
getTransaction(db, tr, options, intrans);
bool _result = wait(makeInterruptable(consistencyCheckCommandActor(tr, tokens, intrans)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "profile")) {
getTransaction(db, tr, options, intrans);
bool _result = wait(makeInterruptable(profileCommandActor(tr, tokens, intrans)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "expensive_data_check")) {
getTransaction(db, tr, options, intrans);
bool _result =
wait(makeInterruptable(expensiveDataCheckCommandActor(db, tr, tokens, &address_interface)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "getrange") ||
tokencmp(tokens[0], "getrangekeys")) { // FIXME: support byte limits, and reverse range reads
if (tokens.size() < 2 || tokens.size() > 4) {
printUsage(tokens[0]);
is_error = true;
} else {
state int limit;
bool valid = true;
if (tokens.size() == 4) {
// INT_MAX is 10 digits; rather than
// worrying about overflow we'll just cap
// limit at the (already absurd)
// nearly-a-billion
if (tokens[3].size() > 9) {
fprintf(stderr, "ERROR: bad limit\n");
is_error = true;
continue;
}
limit = 0;
int place = 1;
for (int i = tokens[3].size(); i > 0; i--) {
int val = int(tokens[3][i - 1]) - int('0');
if (val < 0 || val > 9) {
valid = false;
break;
}
limit += val * place;
place *= 10;
}
if (!valid) {
fprintf(stderr, "ERROR: bad limit\n");
is_error = true;
continue;
}
} else {
limit = 25;
}
Standalone<StringRef> endKey;
if (tokens.size() >= 3) {
endKey = tokens[2];
} else if (tokens[1].size() == 0) {
endKey = normalKeys.end;
} else if (tokens[1] == systemKeys.begin) {
endKey = systemKeys.end;
} else if (tokens[1] >= allKeys.end) {
throw key_outside_legal_range();
} else {
endKey = strinc(tokens[1]);
}
state ThreadFuture<RangeResult> kvsF =
getTransaction(db, tr, options, intrans)->getRange(KeyRangeRef(tokens[1], endKey), limit);
RangeResult kvs = wait(makeInterruptable(safeThreadFutureToFuture(kvsF)));
printf("\nRange limited to %d keys\n", limit);
for (auto iter = kvs.begin(); iter < kvs.end(); iter++) {
if (tokencmp(tokens[0], "getrangekeys"))
printf("`%s'\n", printable((*iter).key).c_str());
else
printf(
"`%s' is `%s'\n", printable((*iter).key).c_str(), printable((*iter).value).c_str());
}
printf("\n");
}
continue;
}
if (tokencmp(tokens[0], "writemode")) {
if (tokens.size() != 2) {
printUsage(tokens[0]);
is_error = true;
} else {
if (tokencmp(tokens[1], "on")) {
writeMode = true;
} else if (tokencmp(tokens[1], "off")) {
writeMode = false;
} else {
printUsage(tokens[0]);
is_error = true;
}
}
continue;
}
if (tokencmp(tokens[0], "set")) {
if (!writeMode) {
fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n");
is_error = true;
continue;
}
if (tokens.size() != 3) {
printUsage(tokens[0]);
is_error = true;
} else {
getTransaction(db, tr, options, intrans);
tr->set(tokens[1], tokens[2]);
if (!intrans) {
wait(commitTransaction(tr));
}
}
continue;
}
if (tokencmp(tokens[0], "clear")) {
if (!writeMode) {
fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n");
is_error = true;
continue;
}
if (tokens.size() != 2) {
printUsage(tokens[0]);
is_error = true;
} else {
getTransaction(db, tr, options, intrans);
tr->clear(tokens[1]);
if (!intrans) {
wait(commitTransaction(tr));
}
}
continue;
}
if (tokencmp(tokens[0], "clearrange")) {
if (!writeMode) {
fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n");
is_error = true;
continue;
}
if (tokens.size() != 3) {
printUsage(tokens[0]);
is_error = true;
} else {
getTransaction(db, tr, options, intrans);
tr->clear(KeyRangeRef(tokens[1], tokens[2]));
if (!intrans) {
wait(commitTransaction(tr));
}
}
continue;
}
if (tokencmp(tokens[0], "datadistribution")) {
bool _result = wait(makeInterruptable(dataDistributionCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "option")) {
if (tokens.size() == 2 || tokens.size() > 4) {
printUsage(tokens[0]);
is_error = true;
} else {
if (tokens.size() == 1) {
if (options->hasAnyOptionsEnabled()) {
printf("\nCurrently enabled options:\n\n");
options->print();
printf("\n");
} else
fprintf(stderr, "There are no options enabled\n");
continue;
}
bool isOn;
if (tokencmp(tokens[1], "on")) {
isOn = true;
} else if (tokencmp(tokens[1], "off")) {
if (intrans) {
fprintf(
stderr,
"ERROR: Cannot turn option off when using a transaction created with `begin'\n");
is_error = true;
continue;
}
if (tokens.size() > 3) {
fprintf(stderr, "ERROR: Cannot specify option argument when turning option off\n");
is_error = true;
continue;
}
isOn = false;
} else {
fprintf(stderr,
"ERROR: Invalid option state `%s': option must be turned `on' or `off'\n",
formatStringRef(tokens[1]).c_str());
is_error = true;
continue;
}
Optional<StringRef> arg = (tokens.size() > 3) ? tokens[3] : Optional<StringRef>();
try {
options->setOption(tr, tokens[2], isOn, arg, intrans);
printf("Option %s for %s\n",
isOn ? "enabled" : "disabled",
intrans ? "current transaction" : "all transactions");
} catch (Error& e) {
// options->setOption() prints error message
TraceEvent(SevWarn, "CLISetOptionError").error(e).detail("Option", tokens[2]);
is_error = true;
}
}
continue;
}
if (tokencmp(tokens[0], "throttle")) {
bool _result = wait(makeInterruptable(throttleCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "cache_range")) {
bool _result = wait(makeInterruptable(cacheRangeCommandActor(db, tokens)));
if (!_result)
is_error = true;
continue;
}
fprintf(stderr, "ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
is_error = true;
}
TraceEvent(SevInfo, "CLICommandLog", randomID).detail("Command", line).detail("IsError", is_error);
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled)
fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
is_error = true;
if (intrans) {
printf("Rolling back current transaction\n");
intrans = false;
options = &globalOptions;
options->apply(tr);
}
}
if (opt.exec.present()) {
return is_error ? 1 : 0;
}
}
}