in fdbserver/fdbserver.actor.cpp [1669:2273]
int main(int argc, char* argv[]) {
// TODO: Remove later, this is just to force the statics to be initialized
// otherwise the unit test won't run
#ifdef ENABLE_SAMPLING
ActorLineageSet _;
#endif
try {
platformInit();
#ifdef ALLOC_INSTRUMENTATION
g_extra_memory = new uint8_t[1000000];
#endif
registerCrashHandler();
// Set default of line buffering standard out and error
setvbuf(stdout, nullptr, _IOLBF, BUFSIZ);
setvbuf(stderr, nullptr, _IOLBF, BUFSIZ);
// Enables profiling on this thread (but does not start it)
registerThreadForProfiling();
#ifdef _WIN32
// Windows needs a gentle nudge to format floats correctly
//_set_output_format(_TWO_DIGIT_EXPONENT);
#endif
const auto opts = CLIOptions::parseArgs(argc, argv);
const auto role = opts.role;
#ifdef _WIN32
// For now, ignore all tests for Windows
if (role == ServerRole::Simulation || role == ServerRole::UnitTests || role == ServerRole::Test) {
printf("Windows tests are not supported yet\n");
flushAndExit(FDB_EXIT_SUCCESS);
}
#endif
if (role == ServerRole::Simulation)
printf("Random seed is %u...\n", opts.randomSeed);
if (opts.zoneId.present())
printf("ZoneId set to %s, dcId to %s\n", printable(opts.zoneId).c_str(), printable(opts.dcId).c_str());
setThreadLocalDeterministicRandomSeed(opts.randomSeed);
enableBuggify(opts.buggifyEnabled, BuggifyType::General);
enableFaultInjection(opts.faultInjectionEnabled);
IKnobCollection::setGlobalKnobCollection(IKnobCollection::Type::SERVER,
Randomize::True,
role == ServerRole::Simulation ? IsSimulated::True
: IsSimulated::False);
IKnobCollection::getMutableGlobalKnobCollection().setKnob("log_directory", KnobValue::create(opts.logFolder));
if (role != ServerRole::Simulation) {
IKnobCollection::getMutableGlobalKnobCollection().setKnob("commit_batches_mem_bytes_hard_limit",
KnobValue::create(int64_t{ opts.memLimit }));
}
for (const auto& [knobName, knobValueString] : opts.knobs) {
try {
auto& g_knobs = IKnobCollection::getMutableGlobalKnobCollection();
auto knobValue = g_knobs.parseKnobValue(knobName, knobValueString);
g_knobs.setKnob(knobName, knobValue);
} catch (Error& e) {
if (e.code() == error_code_invalid_option_value) {
fprintf(stderr,
"WARNING: Invalid value '%s' for knob option '%s'\n",
knobName.c_str(),
knobValueString.c_str());
TraceEvent(SevWarnAlways, "InvalidKnobValue")
.detail("Knob", printable(knobName))
.detail("Value", printable(knobValueString));
} else {
fprintf(stderr, "ERROR: Failed to set knob option '%s': %s\n", knobName.c_str(), e.what());
TraceEvent(SevError, "FailedToSetKnob")
.detail("Knob", printable(knobName))
.detail("Value", printable(knobValueString))
.error(e);
throw;
}
}
}
IKnobCollection::getMutableGlobalKnobCollection().setKnob("server_mem_limit",
KnobValue::create(int64_t{ opts.memLimit }));
// Reinitialize knobs in order to update knobs that are dependent on explicitly set knobs
IKnobCollection::getMutableGlobalKnobCollection().initialize(
Randomize::True, role == ServerRole::Simulation ? IsSimulated::True : IsSimulated::False);
// evictionPolicyStringToEnum will throw an exception if the string is not recognized as a valid
EvictablePageCache::evictionPolicyStringToEnum(FLOW_KNOBS->CACHE_EVICTION_POLICY);
if (opts.memLimit <= FLOW_KNOBS->PAGE_CACHE_4K) {
fprintf(stderr, "ERROR: --memory has to be larger than --cache-memory\n");
flushAndExit(FDB_EXIT_ERROR);
}
if (role == ServerRole::SkipListTest) {
skipListTest();
flushAndExit(FDB_EXIT_SUCCESS);
}
if (role == ServerRole::DSLTest) {
dsltest();
flushAndExit(FDB_EXIT_SUCCESS);
}
if (role == ServerRole::VersionedMapTest) {
versionedMapTest();
flushAndExit(FDB_EXIT_SUCCESS);
}
// Initialize the thread pool
CoroThreadPool::init();
// Ordinarily, this is done when the network is run. However, network thread should be set before TraceEvents
// are logged. This thread will eventually run the network, so call it now.
TraceEvent::setNetworkThread();
std::vector<Future<Void>> listenErrors;
if (role == ServerRole::Simulation || role == ServerRole::CreateTemplateDatabase) {
// startOldSimulator();
startNewSimulator(opts.printSimTime);
openTraceFile(NetworkAddress(), opts.rollsize, opts.maxLogsSize, opts.logFolder, "trace", opts.logGroup);
openTracer(TracerType(deterministicRandom()->randomInt(static_cast<int>(TracerType::DISABLED),
static_cast<int>(TracerType::SIM_END))));
} else {
g_network = newNet2(opts.tlsConfig, opts.useThreadPool, true);
g_network->addStopCallback(Net2FileSystem::stop);
FlowTransport::createInstance(false, 1, WLTOKEN_RESERVED_COUNT);
const bool expectsPublicAddress =
(role == ServerRole::FDBD || role == ServerRole::NetworkTestServer || role == ServerRole::Restore);
if (opts.publicAddressStrs.empty()) {
if (expectsPublicAddress) {
fprintf(stderr, "ERROR: The -p or --public-address option is required\n");
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
}
openTraceFile(
opts.publicAddresses.address, opts.rollsize, opts.maxLogsSize, opts.logFolder, "trace", opts.logGroup);
g_network->initTLS();
if (expectsPublicAddress) {
for (int ii = 0; ii < (opts.publicAddresses.secondaryAddress.present() ? 2 : 1); ++ii) {
const NetworkAddress& publicAddress =
ii == 0 ? opts.publicAddresses.address : opts.publicAddresses.secondaryAddress.get();
const NetworkAddress& listenAddress =
ii == 0 ? opts.listenAddresses.address : opts.listenAddresses.secondaryAddress.get();
try {
const Future<Void>& errorF = FlowTransport::transport().bind(publicAddress, listenAddress);
listenErrors.push_back(errorF);
if (errorF.isReady())
errorF.get();
} catch (Error& e) {
TraceEvent("BindError").error(e);
fprintf(stderr,
"Error initializing networking with public address %s and listen address %s (%s)\n",
publicAddress.toString().c_str(),
listenAddress.toString().c_str(),
e.what());
printHelpTeaser(argv[0]);
flushAndExit(FDB_EXIT_ERROR);
}
}
}
// Use a negative ioTimeout to indicate warn-only
Net2FileSystem::newFileSystem(opts.fileIoWarnOnly ? -opts.fileIoTimeout : opts.fileIoTimeout,
opts.fileSystemPath);
g_network->initMetrics();
FlowTransport::transport().initMetrics();
initTraceEventMetrics();
}
double start = timer(), startNow = now();
std::string cwd = "<unknown>";
try {
cwd = platform::getWorkingDirectory();
} catch (Error& e) {
// Allow for platform error by rethrowing all _other_ errors
if (e.code() != error_code_platform_error)
throw;
}
TraceEvent("ProgramStart")
.setMaxEventLength(12000)
.detail("RandomSeed", opts.randomSeed)
.detail("SourceVersion", getSourceVersion())
.detail("Version", FDB_VT_VERSION)
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detail("FileSystem", opts.fileSystemPath)
.detail("DataFolder", opts.dataFolder)
.detail("WorkingDirectory", cwd)
.detail("ClusterFile", opts.connectionFile ? opts.connectionFile->toString() : "")
.detail("ConnectionString",
opts.connectionFile ? opts.connectionFile->getConnectionString().toString() : "")
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(nullptr))
.setMaxFieldLength(10000)
.detail("CommandLine", opts.commandLine)
.setMaxFieldLength(0)
.detail("BuggifyEnabled", opts.buggifyEnabled)
.detail("FaultInjectionEnabled", opts.faultInjectionEnabled)
.detail("MemoryLimit", opts.memLimit)
.trackLatest("ProgramStart");
Error::init();
std::set_new_handler(&platform::outOfMemory);
setMemoryQuota(opts.memLimit);
Future<Optional<Void>> f;
if (role == ServerRole::Simulation) {
TraceEvent("Simulation").detail("TestFile", opts.testFile);
auto histogramReportActor = histogramReport();
CLIENT_KNOBS->trace();
FLOW_KNOBS->trace();
SERVER_KNOBS->trace();
auto dataFolder = opts.dataFolder.size() ? opts.dataFolder : "simfdb";
std::vector<std::string> directories = platform::listDirectories(dataFolder);
const std::set<std::string> allowedDirectories = { ".", "..", "backups", "unittests" };
for (const auto& dir : directories) {
if (dir.size() != 32 && allowedDirectories.count(dir) == 0 && dir.find("snap") == std::string::npos) {
TraceEvent(SevError, "IncompatibleDirectoryFound")
.detail("DataFolder", dataFolder)
.detail("SuspiciousFile", dir);
fprintf(stderr,
"ERROR: Data folder `%s' had non fdb file `%s'; please use clean, fdb-only folder\n",
dataFolder.c_str(),
dir.c_str());
flushAndExit(FDB_EXIT_ERROR);
}
}
std::vector<std::string> files = platform::listFiles(dataFolder);
if ((files.size() > 1 || (files.size() == 1 && files[0] != "restartInfo.ini")) && !opts.restarting) {
TraceEvent(SevError, "IncompatibleFileFound").detail("DataFolder", dataFolder);
fprintf(stderr,
"ERROR: Data folder `%s' is non-empty; please use clean, fdb-only folder\n",
dataFolder.c_str());
flushAndExit(FDB_EXIT_ERROR);
} else if (files.empty() && opts.restarting) {
TraceEvent(SevWarnAlways, "FileNotFound").detail("DataFolder", dataFolder);
printf("ERROR: Data folder `%s' is empty, but restarting option selected. Run Phase 1 test first\n",
dataFolder.c_str());
flushAndExit(FDB_EXIT_ERROR);
}
int isRestoring = 0;
if (!opts.restarting) {
platform::eraseDirectoryRecursive(dataFolder);
platform::createDirectory(dataFolder);
} else {
CSimpleIni ini;
ini.SetUnicode();
std::string absDataFolder = abspath(dataFolder);
ini.LoadFile(joinPath(absDataFolder, "restartInfo.ini").c_str());
int backupFailed = true;
const char* isRestoringStr = ini.GetValue("RESTORE", "isRestoring", nullptr);
if (isRestoringStr) {
isRestoring = atoi(isRestoringStr);
const char* backupFailedStr = ini.GetValue("RESTORE", "BackupFailed", nullptr);
if (isRestoring && backupFailedStr) {
backupFailed = atoi(backupFailedStr);
}
}
if (isRestoring && !backupFailed) {
std::vector<std::string> returnList;
std::string ext = "";
returnList = platform::listDirectories(absDataFolder);
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
// delete all files (except fdb.cluster) in non-snap directories
for (const auto& dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
if (dirEntry.find(snapStr) != std::string::npos) {
continue;
}
std::string childf = absDataFolder + "/" + dirEntry;
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
for (const auto& fileEntry : returnFiles) {
if (fileEntry != "fdb.cluster" && fileEntry != "fitness") {
TraceEvent("DeletingNonSnapfiles").detail("FileBeingDeleted", childf + "/" + fileEntry);
deleteFile(childf + "/" + fileEntry);
}
}
}
// cleanup unwanted and partial directories
for (const auto& dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + dirEntry;
// delete snap directories which are not part of restoreSnapUID
if (dirEntry.find(snapStr) == std::string::npos) {
if (dirEntry.find("snap") != std::string::npos) {
platform::eraseDirectoryRecursive(dirSrc);
}
continue;
}
// remove empty/partial snap directories
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
if (childrenList.size() == 0) {
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
platform::eraseDirectoryRecursive(dirSrc);
continue;
}
}
// move snapshotted files to appropriate locations
for (const auto& dirEntry : returnList) {
if (dirEntry == "." || dirEntry == "..") {
continue;
}
std::string dirSrc = absDataFolder + "/" + dirEntry;
std::string origDir = dirEntry.substr(0, 32);
std::string dirToMove = absDataFolder + "/" + origDir;
if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("tlog") != std::string::npos)) {
// restore tlog files
restoreRoleFilesHelper(dirSrc, dirToMove, "log");
} else if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("storage") != std::string::npos)) {
// restore storage files
restoreRoleFilesHelper(dirSrc, dirToMove, "storage");
} else if ((dirEntry.find("snap") != std::string::npos) &&
(dirEntry.find("coord") != std::string::npos)) {
// restore coordinator files
restoreRoleFilesHelper(dirSrc, dirToMove, "coordination");
}
}
}
}
setupAndRun(dataFolder, opts.testFile, opts.restarting, (isRestoring >= 1), opts.whitelistBinPaths);
g_simulator.run();
} else if (role == ServerRole::FDBD) {
// Update the global blob credential files list so that both fast
// restore workers and backup workers can access blob storage.
std::vector<std::string>* pFiles =
(std::vector<std::string>*)g_network->global(INetwork::enBlobCredentialFiles);
if (pFiles != nullptr) {
for (auto& f : opts.blobCredentials) {
pFiles->push_back(f);
}
}
// Call fast restore for the class FastRestoreClass. This is a short-cut to run fast restore in circus
if (opts.processClass == ProcessClass::FastRestoreClass) {
printf("Run as fast restore worker\n");
ASSERT(opts.connectionFile);
auto dataFolder = opts.dataFolder;
if (!dataFolder.size())
dataFolder = format("fdb/%d/", opts.publicAddresses.address.port); // SOMEDAY: Better default
std::vector<Future<Void>> actors(listenErrors.begin(), listenErrors.end());
actors.push_back(restoreWorker(opts.connectionFile, opts.localities, dataFolder));
f = stopAfter(waitForAll(actors));
printf("Fast restore worker started\n");
g_network->run();
printf("g_network->run() done\n");
} else { // Call fdbd roles in conventional way
ASSERT(opts.connectionFile);
setupRunLoopProfiler();
auto dataFolder = opts.dataFolder;
if (!dataFolder.size())
dataFolder = format("fdb/%d/", opts.publicAddresses.address.port); // SOMEDAY: Better default
std::vector<Future<Void>> actors(listenErrors.begin(), listenErrors.end());
actors.push_back(fdbd(opts.connectionFile,
opts.localities,
opts.processClass,
dataFolder,
dataFolder,
opts.storageMemLimit,
opts.metricsConnFile,
opts.metricsPrefix,
opts.rsssize,
opts.whitelistBinPaths,
opts.configPath,
opts.manualKnobOverrides,
opts.configDBType));
actors.push_back(histogramReport());
// actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement
f = stopAfter(waitForAll(actors));
g_network->run();
}
} else if (role == ServerRole::MultiTester) {
setupRunLoopProfiler();
f = stopAfter(runTests(opts.connectionFile,
TEST_TYPE_FROM_FILE,
opts.testOnServers ? TEST_ON_SERVERS : TEST_ON_TESTERS,
opts.minTesterCount,
opts.testFile,
StringRef(),
opts.localities));
g_network->run();
} else if (role == ServerRole::Test) {
setupRunLoopProfiler();
auto m = startSystemMonitor(opts.dataFolder, opts.dcId, opts.zoneId, opts.zoneId);
f = stopAfter(runTests(
opts.connectionFile, TEST_TYPE_FROM_FILE, TEST_HERE, 1, opts.testFile, StringRef(), opts.localities));
g_network->run();
} else if (role == ServerRole::ConsistencyCheck) {
setupRunLoopProfiler();
auto m = startSystemMonitor(opts.dataFolder, opts.dcId, opts.zoneId, opts.zoneId);
f = stopAfter(runTests(opts.connectionFile,
TEST_TYPE_CONSISTENCY_CHECK,
TEST_HERE,
1,
opts.testFile,
StringRef(),
opts.localities));
g_network->run();
} else if (role == ServerRole::UnitTests) {
setupRunLoopProfiler();
auto m = startSystemMonitor(opts.dataFolder, opts.dcId, opts.zoneId, opts.zoneId);
f = stopAfter(runTests(opts.connectionFile,
TEST_TYPE_UNIT_TESTS,
TEST_HERE,
1,
opts.testFile,
StringRef(),
opts.localities,
opts.testParams));
g_network->run();
} else if (role == ServerRole::CreateTemplateDatabase) {
createTemplateDatabase();
} else if (role == ServerRole::NetworkTestClient) {
f = stopAfter(networkTestClient(opts.testServersStr));
g_network->run();
} else if (role == ServerRole::NetworkTestServer) {
f = stopAfter(networkTestServer());
g_network->run();
} else if (role == ServerRole::Restore) {
f = stopAfter(restoreWorker(opts.connectionFile, opts.localities, opts.dataFolder));
g_network->run();
} else if (role == ServerRole::KVFileIntegrityCheck) {
f = stopAfter(KVFileCheck(opts.kvFile, true));
g_network->run();
} else if (role == ServerRole::KVFileGenerateIOLogChecksums) {
Optional<Void> result;
try {
GenerateIOLogChecksumFile(opts.kvFile);
result = Void();
} catch (Error& e) {
fprintf(stderr, "Fatal Error: %s\n", e.what());
}
f = result;
} else if (role == ServerRole::KVFileDump) {
f = stopAfter(KVFileDump(opts.kvFile));
g_network->run();
}
int rc = FDB_EXIT_SUCCESS;
if (f.isValid() && f.isReady() && !f.isError() && !f.get().present()) {
rc = FDB_EXIT_ERROR;
}
int unseed = noUnseed ? 0 : deterministicRandom()->randomInt(0, 100001);
TraceEvent("ElapsedTime")
.detail("SimTime", now() - startNow)
.detail("RealTime", timer() - start)
.detail("RandomUnseed", unseed);
if (role == ServerRole::Simulation) {
printf("Unseed: %d\n", unseed);
printf("Elapsed: %f simsec, %f real seconds\n", now() - startNow, timer() - start);
}
// IFailureMonitor::failureMonitor().address_info.clear();
// we should have shut down ALL actors associated with this machine; let's list all of the ones still live
/*{
auto living = Actor::all;
printf("%d surviving actors:\n", living.size());
for(auto a = living.begin(); a != living.end(); ++a)
printf(" #%lld %s %p\n", (*a)->creationIndex, (*a)->getName(), (*a));
}
{
auto living = DatabaseContext::all;
printf("%d surviving DatabaseContexts:\n", living.size());
for(auto a = living.begin(); a != living.end(); ++a)
printf(" #%lld %p\n", (*a)->creationIndex, (*a));
}
{
auto living = TransactionData::all;
printf("%d surviving TransactionData(s):\n", living.size());
for(auto a = living.begin(); a != living.end(); ++a)
printf(" #%lld %p\n", (*a)->creationIndex, (*a));
}*/
/*cout << Actor::allActors.size() << " surviving actors:" << std::endl;
std::map<std::string,int> actorCount;
for(int i=0; i<Actor::allActors.size(); i++)
++actorCount[Actor::allActors[i]->getName()];
for(auto i = actorCount.rbegin(); !(i == actorCount.rend()); ++i)
std::cout << " " << i->second << " " << i->first << std::endl;*/
// std::cout << " " << Actor::allActors[i]->getName() << std::endl;
if (role == ServerRole::Simulation) {
unsigned long sevErrorEventsLogged = TraceEvent::CountEventsLoggedAt(SevError);
if (sevErrorEventsLogged > 0) {
printf("%lu SevError events logged\n", sevErrorEventsLogged);
rc = FDB_EXIT_ERROR;
}
}
// g_simulator.run();
#ifdef ALLOC_INSTRUMENTATION
{
std::cout << "Page Counts: " << FastAllocator<16>::pageCount << " " << FastAllocator<32>::pageCount << " "
<< FastAllocator<64>::pageCount << " " << FastAllocator<128>::pageCount << " "
<< FastAllocator<256>::pageCount << " " << FastAllocator<512>::pageCount << " "
<< FastAllocator<1024>::pageCount << " " << FastAllocator<2048>::pageCount << " "
<< FastAllocator<4096>::pageCount << " " << FastAllocator<8192>::pageCount << " "
<< FastAllocator<16384>::pageCount << std::endl;
std::vector<std::pair<std::string, const char*>> typeNames;
for (auto i = allocInstr.begin(); i != allocInstr.end(); ++i) {
std::string s;
#ifdef __linux__
char* demangled = abi::__cxa_demangle(i->first, nullptr, nullptr, nullptr);
if (demangled) {
s = demangled;
if (StringRef(s).startsWith(LiteralStringRef("(anonymous namespace)::")))
s = s.substr(LiteralStringRef("(anonymous namespace)::").size());
free(demangled);
} else
s = i->first;
#else
s = i->first;
if (StringRef(s).startsWith(LiteralStringRef("class `anonymous namespace'::")))
s = s.substr(LiteralStringRef("class `anonymous namespace'::").size());
else if (StringRef(s).startsWith(LiteralStringRef("class ")))
s = s.substr(LiteralStringRef("class ").size());
else if (StringRef(s).startsWith(LiteralStringRef("struct ")))
s = s.substr(LiteralStringRef("struct ").size());
#endif
typeNames.emplace_back(s, i->first);
}
std::sort(typeNames.begin(), typeNames.end());
for (int i = 0; i < typeNames.size(); i++) {
const char* n = typeNames[i].second;
auto& f = allocInstr[n];
printf("%+d\t%+d\t%d\t%d\t%s\n",
f.allocCount,
-f.deallocCount,
f.allocCount - f.deallocCount,
f.maxAllocated,
typeNames[i].first.c_str());
}
// We're about to exit and clean up data structures, this will wreak havoc on allocation recording
memSample_entered = true;
}
#endif
// printf("\n%d tests passed; %d tests failed\n", passCount, failCount);
flushAndExit(rc);
} catch (Error& e) {
fprintf(stderr, "Error: %s\n", e.what());
TraceEvent(SevError, "MainError").error(e);
// printf("\n%d tests passed; %d tests failed\n", passCount, failCount);
flushAndExit(FDB_EXIT_MAIN_ERROR);
} catch (boost::system::system_error& e) {
ASSERT_WE_THINK(false); // boost errors shouldn't leak
fprintf(stderr, "boost::system::system_error: %s (%d)", e.what(), e.code().value());
TraceEvent(SevError, "MainError").error(unknown_error()).detail("RootException", e.what());
// printf("\n%d tests passed; %d tests failed\n", passCount, failCount);
flushAndExit(FDB_EXIT_MAIN_EXCEPTION);
} catch (std::exception& e) {
fprintf(stderr, "std::exception: %s\n", e.what());
TraceEvent(SevError, "MainError").error(unknown_error()).detail("RootException", e.what());
// printf("\n%d tests passed; %d tests failed\n", passCount, failCount);
flushAndExit(FDB_EXIT_MAIN_EXCEPTION);
}
static_assert(LBLocalityData<StorageServerInterface>::Present, "Storage server interface should be load balanced");
static_assert(LBLocalityData<CommitProxyInterface>::Present, "Commit proxy interface should be load balanced");
static_assert(LBLocalityData<GrvProxyInterface>::Present, "GRV proxy interface should be load balanced");
static_assert(LBLocalityData<TLogInterface>::Present, "TLog interface should be load balanced");
static_assert(!LBLocalityData<MasterInterface>::Present, "Master interface should not be load balanced");
}