in fdbserver/worker.actor.cpp [1366:2293]
ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
LocalityData locality,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
ProcessClass initialClass,
std::string folder,
int64_t memoryLimit,
std::string metricsConnFile,
std::string metricsPrefix,
Promise<Void> recoveredDiskFiles,
int64_t memoryProfileThreshold,
std::string _coordFolder,
std::string whitelistBinPaths,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
ConfigDBType configDBType,
Reference<LocalConfiguration> localConfig) {
state PromiseStream<ErrorInfo> errors;
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
new AsyncVar<Optional<DataDistributorInterface>>());
state Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf(new AsyncVar<Optional<RatekeeperInterface>>());
state Reference<AsyncVar<Optional<BlobManagerInterface>>> bmInterf(new AsyncVar<Optional<BlobManagerInterface>>());
state Reference<AsyncVar<Optional<EncryptKeyProxyInterface>>> ekpInterf(
new AsyncVar<Optional<EncryptKeyProxyInterface>>());
state Future<Void> handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last
state ActorCollection errorForwarders(false);
state Future<Void> loggingTrigger = Void();
state double loggingDelay = SERVER_KNOBS->WORKER_LOGGING_INTERVAL;
state ActorCollection filesClosed(true);
state Promise<Void> stopping;
state WorkerCache<InitializeStorageReply> storageCache;
state Future<Void> metricsLogger;
state Future<Void> chaosMetricsActor;
state Reference<AsyncVar<bool>> degraded = FlowTransport::transport().getDegraded();
// tLogFnForOptions() can return a function that doesn't correspond with the FDB version that the
// TLogVersion represents. This can be done if the newer TLog doesn't support a requested option.
// As (store type, spill type) can map to the same TLogFn across multiple TLogVersions, we need to
// decide if we should collapse them into the same SharedTLog instance as well. The answer
// here is no, so that when running with log_version==3, all files should say V=3.
state std::map<SharedLogsKey, SharedLogsValue> sharedLogs;
state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
state WorkerCache<InitializeBackupReply> backupWorkerCache;
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf(locality);
state std::set<std::pair<UID, KeyValueStoreType>> runningStorages;
interf.initEndpoints();
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
TraceEvent(SevInfo, "ChaosFeaturesEnabled");
chaosMetricsActor = chaosMetricsLogger();
}
folder = abspath(folder);
if (metricsPrefix.size() > 0) {
if (metricsConnFile.size() > 0) {
try {
state Database db =
Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST, IsInternal::True, locality);
metricsLogger = runMetrics(db, KeyRef(metricsPrefix));
} catch (Error& e) {
TraceEvent(SevWarnAlways, "TDMetricsBadClusterFile").error(e).detail("ConnFile", metricsConnFile);
}
} else {
auto lockAware = metricsPrefix.size() && metricsPrefix[0] == '\xff' ? LockAware::True : LockAware::False;
metricsLogger =
runMetrics(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, lockAware), KeyRef(metricsPrefix));
}
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
}
errorForwarders.add(resetAfter(degraded,
SERVER_KNOBS->DEGRADED_RESET_INTERVAL,
false,
SERVER_KNOBS->DEGRADED_WARNING_LIMIT,
SERVER_KNOBS->DEGRADED_WARNING_RESET_DELAY,
"DegradedReset"));
errorForwarders.add(loadedPonger(interf.debugPing.getFuture()));
errorForwarders.add(waitFailureServer(interf.waitFailure.getFuture()));
errorForwarders.add(monitorTraceLogIssues(issues));
errorForwarders.add(testerServerCore(interf.testerInterface, connRecord, dbInfo, locality));
errorForwarders.add(monitorHighMemory(memoryProfileThreshold));
filesClosed.add(stopping.getFuture());
initializeSystemMonitorMachineState(SystemMonitorMachineState(
folder, locality.dcId(), locality.zoneId(), locality.machineId(), g_network->getLocalAddress().ip));
{
auto recruited = interf;
DUMPTOKEN(recruited.clientInterface.reboot);
DUMPTOKEN(recruited.clientInterface.profiler);
DUMPTOKEN(recruited.tLog);
DUMPTOKEN(recruited.master);
DUMPTOKEN(recruited.commitProxy);
DUMPTOKEN(recruited.grvProxy);
DUMPTOKEN(recruited.resolver);
DUMPTOKEN(recruited.storage);
DUMPTOKEN(recruited.debugPing);
DUMPTOKEN(recruited.coordinationPing);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.setMetricsRate);
DUMPTOKEN(recruited.eventLogRequest);
DUMPTOKEN(recruited.traceBatchDumpRequest);
DUMPTOKEN(recruited.updateServerDBInfo);
}
state std::vector<Future<Void>> recoveries;
try {
std::vector<DiskStore> stores = getDiskStores(folder);
bool validateDataFiles = deleteFile(joinPath(folder, validationFilename));
for (int f = 0; f < stores.size(); f++) {
DiskStore s = stores[f];
// FIXME: Error handling
if (s.storedComponent == DiskStore::Storage) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
IKeyValueStore* kv =
openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, false, validateDataFiles);
Future<Void> kvClosed = kv->onClosed();
filesClosed.add(kvClosed);
// std::string doesn't have startsWith
std::string tssPrefix = testingStoragePrefix.toString();
// TODO might be more efficient to mark a boolean on DiskStore in getDiskStores, but that kind of breaks
// the abstraction since DiskStore also applies to storage cache + tlog
bool isTss = s.filename.find(tssPrefix) != std::string::npos;
Role ssRole = isTss ? Role::TESTING_STORAGE_SERVER : Role::STORAGE_SERVER;
StorageServerInterface recruited;
recruited.uniqueID = s.storeID;
recruited.locality = locality;
recruited.tssPairID =
isTss ? Optional<UID>(UID())
: Optional<UID>(); // presence of optional is used as source of truth for tss vs not. Value
// gets overridden later in restoreDurableState
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["StorageEngine"] = s.storeType.toString();
details["IsTSS"] = isTss ? "Yes" : "No";
startRole(ssRole, recruited.id(), interf.id(), details, "Restored");
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getReadHotRanges);
DUMPTOKEN(recruited.getRangeSplitPoints);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
DUMPTOKEN(recruited.getKeyValuesStream);
DUMPTOKEN(recruited.getKeyValuesAndFlatMap);
Promise<Void> recovery;
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connRecord);
recoveries.push_back(recovery.getFuture());
f = handleIOErrors(f, kv, s.storeID, kvClosed);
f = storageServerRollbackRebooter(&runningStorages,
f,
s.storeType,
s.filename,
recruited.id(),
recruited.locality,
isTss,
dbInfo,
folder,
&filesClosed,
memoryLimit,
kv);
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), f));
} else if (s.storedComponent == DiskStore::TLogData) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::TLog;
std::string logQueueBasename;
const std::string filename = basename(s.filename);
if (StringRef(filename).startsWith(fileLogDataPrefix)) {
logQueueBasename = fileLogQueuePrefix.toString();
} else {
StringRef optionsString = StringRef(filename).removePrefix(fileVersionedLogDataPrefix).eat("-");
logQueueBasename = fileLogQueuePrefix.toString() + optionsString.toString() + "-";
}
ASSERT_WE_THINK(abspath(parentDirectory(s.filename)) == folder);
IKeyValueStore* kv = openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles);
const DiskQueueVersion dqv =
s.tLogOptions.version >= TLogVersion::V3 ? DiskQueueVersion::V1 : DiskQueueVersion::V0;
const int64_t diskQueueWarnSize =
s.tLogOptions.spillType == TLogSpillType::VALUE ? 10 * SERVER_KNOBS->TARGET_BYTES_PER_TLOG : -1;
IDiskQueue* queue = openDiskQueue(joinPath(folder, logQueueBasename + s.storeID.toString() + "-"),
tlogQueueExtension.toString(),
s.storeID,
dqv,
diskQueueWarnSize);
filesClosed.add(kv->onClosed());
filesClosed.add(queue->onClosed());
std::map<std::string, std::string> details;
details["StorageEngine"] = s.storeType.toString();
startRole(Role::SHARED_TRANSACTION_LOG, s.storeID, interf.id(), details, "Restored");
Promise<Void> oldLog;
Promise<Void> recovery;
TLogFn tLogFn = tLogFnForOptions(s.tLogOptions);
auto& logData = sharedLogs[SharedLogsKey(s.tLogOptions, s.storeType)];
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ?
Future<Void> tl =
tLogFn(kv,
queue,
dbInfo,
locality,
!logData.actor.isValid() || logData.actor.isReady() ? logData.requests
: PromiseStream<InitializeTLogRequest>(),
s.storeID,
interf.id(),
true,
oldLog,
recovery,
folder,
degraded,
activeSharedTLog);
recoveries.push_back(recovery.getFuture());
activeSharedTLog->set(s.storeID);
tl = handleIOErrors(tl, kv, s.storeID);
tl = handleIOErrors(tl, queue, s.storeID);
if (!logData.actor.isValid() || logData.actor.isReady()) {
logData.actor = oldLog.getFuture() || tl;
logData.uid = s.storeID;
}
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl));
}
}
bool hasCache = false;
// start cache role if we have the right process class
if (initialClass.classType() == ProcessClass::StorageCacheClass) {
hasCache = true;
StorageServerInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
std::map<std::string, std::string> details;
startRole(Role::STORAGE_CACHE, recruited.id(), interf.id(), details);
// DUMPTOKEN(recruited.getVersion);
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getKeyValuesAndFlatMap);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
auto f = storageCacheServer(recruited, 0, dbInfo);
f = storageCacheRollbackRebooter(f, recruited.id(), recruited.locality, dbInfo);
errorForwarders.add(forwardError(errors, Role::STORAGE_CACHE, recruited.id(), f));
}
std::map<std::string, std::string> details;
details["Locality"] = locality.toString();
details["DataFolder"] = folder;
details["StoresPresent"] = format("%d", stores.size());
details["CachePresent"] = hasCache ? "true" : "false";
startRole(Role::WORKER, interf.id(), interf.id(), details);
errorForwarders.add(traceRole(Role::WORKER, interf.id()));
wait(waitForAll(recoveries));
recoveredDiskFiles.send(Void());
errorForwarders.add(registrationClient(ccInterface,
interf,
asyncPriorityInfo,
initialClass,
ddInterf,
rkInterf,
bmInterf,
ekpInterf,
degraded,
connRecord,
issues,
localConfig,
dbInfo));
if (configDBType != ConfigDBType::DISABLED) {
errorForwarders.add(localConfig->consume(interf.configBroadcastInterface));
}
if (SERVER_KNOBS->ENABLE_WORKER_HEALTH_MONITOR) {
errorForwarders.add(healthMonitor(ccInterface, interf, locality, dbInfo));
}
TraceEvent("RecoveriesComplete", interf.id());
loop choose {
when(UpdateServerDBInfoRequest req = waitNext(interf.updateServerDBInfo.getFuture())) {
ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(
req.serializedDbInfo, AssumeVersion(g_network->protocolVersion()));
localInfo.myLocality = locality;
if (localInfo.infoGeneration < dbInfo->get().infoGeneration &&
localInfo.clusterInterface == dbInfo->get().clusterInterface) {
std::vector<Endpoint> rep = req.broadcastInfo;
rep.push_back(interf.updateServerDBInfo.getEndpoint());
req.reply.send(rep);
} else {
Optional<Endpoint> notUpdated;
if (!ccInterface->get().present() || localInfo.clusterInterface != ccInterface->get().get()) {
notUpdated = interf.updateServerDBInfo.getEndpoint();
} else if (localInfo.infoGeneration > dbInfo->get().infoGeneration ||
dbInfo->get().clusterInterface != ccInterface->get().get()) {
TraceEvent("GotServerDBInfoChange")
.detail("ChangeID", localInfo.id)
.detail("InfoGeneration", localInfo.infoGeneration)
.detail("MasterID", localInfo.master.id())
.detail("RatekeeperID",
localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
.detail("DataDistributorID",
localInfo.distributor.present() ? localInfo.distributor.get().id() : UID())
.detail("BlobManagerID",
localInfo.blobManager.present() ? localInfo.blobManager.get().id() : UID())
.detail("EncryptKeyProxyID",
localInfo.encryptKeyProxy.present() ? localInfo.encryptKeyProxy.get().id() : UID());
dbInfo->set(localInfo);
}
errorForwarders.add(
success(broadcastDBInfoRequest(req, SERVER_KNOBS->DBINFO_SEND_AMOUNT, notUpdated, true)));
}
}
when(RebootRequest req = waitNext(interf.clientInterface.reboot.getFuture())) {
state RebootRequest rebootReq = req;
// If suspendDuration is INT_MAX, the trace will not be logged if it was inside the next block
// Also a useful trace to have even if suspendDuration is 0
TraceEvent("RebootRequestSuspendingProcess").detail("Duration", req.waitForDuration);
if (req.waitForDuration) {
flushTraceFileVoid();
setProfilingEnabled(0);
g_network->stop();
threadSleep(req.waitForDuration);
}
if (rebootReq.checkData) {
Reference<IAsyncFile> checkFile =
wait(IAsyncFileSystem::filesystem()->open(joinPath(folder, validationFilename),
IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE,
0600));
wait(checkFile->sync());
}
if (g_network->isSimulated()) {
TraceEvent("SimulatedReboot").detail("Deletion", rebootReq.deleteData);
if (rebootReq.deleteData) {
throw please_reboot_delete();
}
throw please_reboot();
} else {
TraceEvent("ProcessReboot").log();
ASSERT(!rebootReq.deleteData);
flushAndExit(0);
}
}
when(SetFailureInjection req = waitNext(interf.clientInterface.setFailureInjection.getFuture())) {
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
if (req.diskFailure.present()) {
auto diskFailureInjector = DiskFailureInjector::injector();
diskFailureInjector->setDiskFailure(req.diskFailure.get().stallInterval,
req.diskFailure.get().stallPeriod,
req.diskFailure.get().throttlePeriod);
} else if (req.flipBits.present()) {
auto bitFlipper = BitFlipper::flipper();
bitFlipper->setBitFlipPercentage(req.flipBits.get().percentBitFlips);
}
req.reply.send(Void());
} else {
req.reply.sendError(client_invalid_operation());
}
}
when(ProfilerRequest req = waitNext(interf.clientInterface.profiler.getFuture())) {
state ProfilerRequest profilerReq = req;
// There really isn't a great "filepath sanitizer" or "filepath escape" function available,
// thus we instead enforce a different requirement. One can only write to a file that's
// beneath the working directory, and we remove the ability to do any symlink or ../..
// tricks by resolving all paths through `abspath` first.
try {
std::string realLogDir = abspath(SERVER_KNOBS->LOG_DIRECTORY);
std::string realOutPath = abspath(realLogDir + "/" + profilerReq.outputFile.toString());
if (realLogDir.size() < realOutPath.size() &&
strncmp(realLogDir.c_str(), realOutPath.c_str(), realLogDir.size()) == 0) {
profilerReq.outputFile = realOutPath;
uncancellable(runProfiler(profilerReq));
profilerReq.reply.send(Void());
} else {
profilerReq.reply.sendError(client_invalid_operation());
}
} catch (Error& e) {
profilerReq.reply.sendError(e);
}
}
when(RecruitMasterRequest req = waitNext(interf.master.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Master;
MasterInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
startRole(Role::MASTER, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getCommitVersion);
DUMPTOKEN(recruited.getLiveCommittedVersion);
DUMPTOKEN(recruited.reportLiveCommittedVersion);
DUMPTOKEN(recruited.updateRecoveryData);
// printf("Recruited as masterServer\n");
Future<Void> masterProcess = masterServer(
recruited, dbInfo, ccInterface, ServerCoordinators(connRecord), req.lifetime, req.forceRecovery);
errorForwarders.add(
zombie(recruited, forwardError(errors, Role::MASTER, recruited.id(), masterProcess)));
req.reply.send(recruited);
}
when(InitializeDataDistributorRequest req = waitNext(interf.dataDistributor.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::DataDistributor;
DataDistributorInterface recruited(locality, req.reqId);
recruited.initEndpoints();
if (ddInterf->get().present()) {
recruited = ddInterf->get().get();
TEST(true); // Recruited while already a data distributor.
} else {
startRole(Role::DATA_DISTRIBUTOR, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
Future<Void> dataDistributorProcess = dataDistributor(recruited, dbInfo);
errorForwarders.add(forwardError(
errors,
Role::DATA_DISTRIBUTOR,
recruited.id(),
setWhenDoneOrError(dataDistributorProcess, ddInterf, Optional<DataDistributorInterface>())));
ddInterf->set(Optional<DataDistributorInterface>(recruited));
}
TraceEvent("DataDistributorReceived", req.reqId).detail("DataDistributorId", recruited.id());
req.reply.send(recruited);
}
when(InitializeRatekeeperRequest req = waitNext(interf.ratekeeper.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Ratekeeper;
RatekeeperInterface recruited(locality, req.reqId);
recruited.initEndpoints();
if (rkInterf->get().present()) {
recruited = rkInterf->get().get();
TEST(true); // Recruited while already a ratekeeper.
} else {
startRole(Role::RATEKEEPER, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getRateInfo);
DUMPTOKEN(recruited.haltRatekeeper);
DUMPTOKEN(recruited.reportCommitCostEstimation);
Future<Void> ratekeeperProcess = ratekeeper(recruited, dbInfo);
errorForwarders.add(
forwardError(errors,
Role::RATEKEEPER,
recruited.id(),
setWhenDoneOrError(ratekeeperProcess, rkInterf, Optional<RatekeeperInterface>())));
rkInterf->set(Optional<RatekeeperInterface>(recruited));
}
TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id());
req.reply.send(recruited);
}
when(InitializeBlobManagerRequest req = waitNext(interf.blobManager.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobManager;
BlobManagerInterface recruited(locality, req.reqId);
recruited.initEndpoints();
if (bmInterf->get().present()) {
recruited = bmInterf->get().get();
TEST(true); // Recruited while already a blob manager.
} else {
startRole(Role::BLOB_MANAGER, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.haltBlobManager);
Future<Void> blobManagerProcess = blobManager(recruited, dbInfo, req.epoch);
errorForwarders.add(forwardError(
errors,
Role::BLOB_MANAGER,
recruited.id(),
setWhenDoneOrError(blobManagerProcess, bmInterf, Optional<BlobManagerInterface>())));
bmInterf->set(Optional<BlobManagerInterface>(recruited));
}
TraceEvent("BlobManagerReceived", req.reqId).detail("BlobManagerId", recruited.id());
req.reply.send(recruited);
}
when(InitializeBackupRequest req = waitNext(interf.backup.getFuture())) {
if (!backupWorkerCache.exists(req.reqId)) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Backup;
BackupInterface recruited(locality);
recruited.initEndpoints();
startRole(Role::BACKUP, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
ReplyPromise<InitializeBackupReply> backupReady = req.reply;
backupWorkerCache.set(req.reqId, backupReady.getFuture());
Future<Void> backupProcess = backupWorker(recruited, req, dbInfo);
backupProcess = storageCache.removeOnReady(req.reqId, backupProcess);
errorForwarders.add(forwardError(errors, Role::BACKUP, recruited.id(), backupProcess));
TraceEvent("BackupInitRequest", req.reqId).detail("BackupId", recruited.id());
InitializeBackupReply reply(recruited, req.backupEpoch);
backupReady.send(reply);
} else {
forwardPromise(req.reply, backupWorkerCache.get(req.reqId));
}
}
when(InitializeEncryptKeyProxyRequest req = waitNext(interf.encryptKeyProxy.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::EncryptKeyProxy;
EncryptKeyProxyInterface recruited(locality, req.reqId);
recruited.initEndpoints();
if (ekpInterf->get().present()) {
recruited = ekpInterf->get().get();
TEST(true); // Recruited while already a encryptKeyProxy server.
} else {
startRole(Role::ENCRYPT_KEY_PROXY, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
Future<Void> encryptKeyProxyProcess = encryptKeyProxyServer(recruited, dbInfo);
errorForwarders.add(forwardError(
errors,
Role::ENCRYPT_KEY_PROXY,
recruited.id(),
setWhenDoneOrError(encryptKeyProxyProcess, ekpInterf, Optional<EncryptKeyProxyInterface>())));
ekpInterf->set(Optional<EncryptKeyProxyInterface>(recruited));
}
TraceEvent("EncryptKeyProxyReceived", req.reqId).detail("EncryptKeyProxyId", recruited.id());
req.reply.send(recruited);
}
when(InitializeTLogRequest req = waitNext(interf.tLog.getFuture())) {
// For now, there's a one-to-one mapping of spill type to TLogVersion.
// With future work, a particular version of the TLog can support multiple
// different spilling strategies, at which point SpillType will need to be
// plumbed down into tLogFn.
if (req.logVersion < TLogVersion::MIN_RECRUITABLE) {
TraceEvent(SevError, "InitializeTLogInvalidLogVersion")
.detail("Version", req.logVersion)
.detail("MinRecruitable", TLogVersion::MIN_RECRUITABLE);
req.reply.sendError(internal_error());
}
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::TLog;
TLogOptions tLogOptions(req.logVersion, req.spillType);
TLogFn tLogFn = tLogFnForOptions(tLogOptions);
auto& logData = sharedLogs[SharedLogsKey(tLogOptions, req.storeType)];
logData.requests.send(req);
if (!logData.actor.isValid() || logData.actor.isReady()) {
UID logId = deterministicRandom()->randomUniqueID();
std::map<std::string, std::string> details;
details["ForMaster"] = req.recruitmentID.shortString();
details["StorageEngine"] = req.storeType.toString();
// FIXME: start role for every tlog instance, rather that just for the shared actor, also use a
// different role type for the shared actor
startRole(Role::SHARED_TRANSACTION_LOG, logId, interf.id(), details);
const StringRef prefix =
req.logVersion > TLogVersion::V2 ? fileVersionedLogDataPrefix : fileLogDataPrefix;
std::string filename =
filenameFromId(req.storeType, folder, prefix.toString() + tLogOptions.toPrefix(), logId);
IKeyValueStore* data = openKVStore(req.storeType, filename, logId, memoryLimit);
const DiskQueueVersion dqv =
tLogOptions.version >= TLogVersion::V3 ? DiskQueueVersion::V1 : DiskQueueVersion::V0;
IDiskQueue* queue = openDiskQueue(
joinPath(folder,
fileLogQueuePrefix.toString() + tLogOptions.toPrefix() + logId.toString() + "-"),
tlogQueueExtension.toString(),
logId,
dqv);
filesClosed.add(data->onClosed());
filesClosed.add(queue->onClosed());
Future<Void> tLogCore = tLogFn(data,
queue,
dbInfo,
locality,
logData.requests,
logId,
interf.id(),
false,
Promise<Void>(),
Promise<Void>(),
folder,
degraded,
activeSharedTLog);
tLogCore = handleIOErrors(tLogCore, data, logId);
tLogCore = handleIOErrors(tLogCore, queue, logId);
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore));
logData.actor = tLogCore;
logData.uid = logId;
}
activeSharedTLog->set(logData.uid);
}
when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) {
// We want to prevent double recruiting on a worker unless we try to recruit something
// with a different storage engine (otherwise storage migration won't work for certain
// configuration). Additionally we also need to allow double recruitment for seed servers.
// The reason for this is that a storage will only remove itself if after it was able
// to read the system key space. But if recovery fails right after a `configure new ...`
// was run it won't be able to do so.
if (!storageCache.exists(req.reqId) &&
(std::all_of(runningStorages.begin(),
runningStorages.end(),
[&req](const auto& p) { return p.second != req.storeType; }) ||
req.seedTag != invalidTag)) {
ASSERT(req.clusterId.isValid());
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
bool isTss = req.tssPairIDAndVersion.present();
StorageServerInterface recruited(req.interfaceId);
recruited.locality = locality;
recruited.tssPairID = isTss ? req.tssPairIDAndVersion.get().first : Optional<UID>();
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["StorageEngine"] = req.storeType.toString();
details["IsTSS"] = std::to_string(isTss);
Role ssRole = isTss ? Role::TESTING_STORAGE_SERVER : Role::STORAGE_SERVER;
startRole(ssRole, recruited.id(), interf.id(), details);
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getReadHotRanges);
DUMPTOKEN(recruited.getRangeSplitPoints);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
DUMPTOKEN(recruited.getKeyValuesStream);
DUMPTOKEN(recruited.getKeyValuesAndFlatMap);
// printf("Recruited as storageServer\n");
std::string filename =
filenameFromId(req.storeType,
folder,
isTss ? testingStoragePrefix.toString() : fileStoragePrefix.toString(),
recruited.id());
IKeyValueStore* data = openKVStore(req.storeType, filename, recruited.id(), memoryLimit);
Future<Void> kvClosed = data->onClosed();
filesClosed.add(kvClosed);
ReplyPromise<InitializeStorageReply> storageReady = req.reply;
storageCache.set(req.reqId, storageReady.getFuture());
Future<Void> s = storageServer(data,
recruited,
req.seedTag,
req.clusterId,
isTss ? req.tssPairIDAndVersion.get().second : 0,
storageReady,
dbInfo,
folder);
s = handleIOErrors(s, data, recruited.id(), kvClosed);
s = storageCache.removeOnReady(req.reqId, s);
s = storageServerRollbackRebooter(&runningStorages,
s,
req.storeType,
filename,
recruited.id(),
recruited.locality,
isTss,
dbInfo,
folder,
&filesClosed,
memoryLimit,
data);
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), s));
} else if (storageCache.exists(req.reqId)) {
forwardPromise(req.reply, storageCache.get(req.reqId));
} else {
TraceEvent("AttemptedDoubleRecruitment", interf.id()).detail("ForRole", "StorageServer");
errorForwarders.add(map(delay(0.5), [reply = req.reply](Void) {
reply.sendError(recruitment_failed());
return Void();
}));
}
}
when(InitializeBlobWorkerRequest req = waitNext(interf.blobWorker.getFuture())) {
BlobWorkerInterface recruited(locality, req.interfaceId);
recruited.initEndpoints();
startRole(Role::BLOB_WORKER, recruited.id(), interf.id());
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady = req.reply;
Future<Void> bw = blobWorker(recruited, blobWorkerReady, dbInfo);
errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw));
}
when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::CommitProxy;
CommitProxyInterface recruited;
recruited.processId = locality.processId();
recruited.provisional = false;
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["ForMaster"] = req.master.id().shortString();
startRole(Role::COMMIT_PROXY, recruited.id(), interf.id(), details);
DUMPTOKEN(recruited.commit);
DUMPTOKEN(recruited.getConsistentReadVersion);
DUMPTOKEN(recruited.getKeyServersLocations);
DUMPTOKEN(recruited.getStorageServerRejoinInfo);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.txnState);
// printf("Recruited as commitProxyServer\n");
errorForwarders.add(zombie(recruited,
forwardError(errors,
Role::COMMIT_PROXY,
recruited.id(),
commitProxyServer(recruited, req, dbInfo, whitelistBinPaths))));
req.reply.send(recruited);
}
when(InitializeGrvProxyRequest req = waitNext(interf.grvProxy.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::GrvProxy;
GrvProxyInterface recruited;
recruited.processId = locality.processId();
recruited.provisional = false;
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["ForMaster"] = req.master.id().shortString();
startRole(Role::GRV_PROXY, recruited.id(), interf.id(), details);
DUMPTOKEN(recruited.getConsistentReadVersion);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getHealthMetrics);
// printf("Recruited as grvProxyServer\n");
errorForwarders.add(zombie(
recruited,
forwardError(errors, Role::GRV_PROXY, recruited.id(), grvProxyServer(recruited, req, dbInfo))));
req.reply.send(recruited);
}
when(InitializeResolverRequest req = waitNext(interf.resolver.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Resolver;
ResolverInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
std::map<std::string, std::string> details;
startRole(Role::RESOLVER, recruited.id(), interf.id(), details);
DUMPTOKEN(recruited.resolve);
DUMPTOKEN(recruited.metrics);
DUMPTOKEN(recruited.split);
DUMPTOKEN(recruited.waitFailure);
errorForwarders.add(zombie(
recruited, forwardError(errors, Role::RESOLVER, recruited.id(), resolver(recruited, req, dbInfo))));
req.reply.send(recruited);
}
when(InitializeLogRouterRequest req = waitNext(interf.logRouter.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::LogRouter;
TLogInterface recruited(locality);
recruited.initEndpoints();
std::map<std::string, std::string> details;
startRole(Role::LOG_ROUTER, recruited.id(), interf.id(), details);
DUMPTOKEN(recruited.peekMessages);
DUMPTOKEN(recruited.peekStreamMessages);
DUMPTOKEN(recruited.popMessages);
DUMPTOKEN(recruited.commit);
DUMPTOKEN(recruited.lock);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.confirmRunning);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.recoveryFinished);
DUMPTOKEN(recruited.disablePopRequest);
DUMPTOKEN(recruited.enablePopRequest);
DUMPTOKEN(recruited.snapRequest);
errorForwarders.add(
zombie(recruited,
forwardError(errors, Role::LOG_ROUTER, recruited.id(), logRouter(recruited, req, dbInfo))));
req.reply.send(recruited);
}
when(CoordinationPingMessage m = waitNext(interf.coordinationPing.getFuture())) {
TraceEvent("CoordinationPing", interf.id())
.detail("CCID", m.clusterControllerId)
.detail("TimeStep", m.timeStep);
}
when(SetMetricsLogRateRequest req = waitNext(interf.setMetricsRate.getFuture())) {
TraceEvent("LoggingRateChange", interf.id())
.detail("OldDelay", loggingDelay)
.detail("NewLogPS", req.metricsLogsPerSecond);
if (req.metricsLogsPerSecond != 0) {
loggingDelay = 1.0 / req.metricsLogsPerSecond;
loggingTrigger = Void();
}
}
when(EventLogRequest req = waitNext(interf.eventLogRequest.getFuture())) {
TraceEventFields e;
if (req.getLastError)
e = latestEventCache.getLatestError();
else
e = latestEventCache.get(req.eventName.toString());
req.reply.send(e);
}
when(TraceBatchDumpRequest req = waitNext(interf.traceBatchDumpRequest.getFuture())) {
g_traceBatch.dump();
req.reply.send(Void());
}
when(DiskStoreRequest req = waitNext(interf.diskStoreRequest.getFuture())) {
Standalone<VectorRef<UID>> ids;
for (DiskStore d : getDiskStores(folder)) {
bool included = true;
if (!req.includePartialStores) {
if (d.storeType == KeyValueStoreType::SSD_BTREE_V1) {
included = fileExists(d.filename + ".fdb-wal");
} else if (d.storeType == KeyValueStoreType::SSD_BTREE_V2) {
included = fileExists(d.filename + ".sqlite-wal");
} else if (d.storeType == KeyValueStoreType::SSD_REDWOOD_V1) {
included = fileExists(d.filename + "0.pagerlog") && fileExists(d.filename + "1.pagerlog");
} else if (d.storeType == KeyValueStoreType::SSD_ROCKSDB_V1) {
included = fileExists(joinPath(d.filename, "CURRENT")) &&
fileExists(joinPath(d.filename, "IDENTITY"));
} else if (d.storeType == KeyValueStoreType::MEMORY) {
included = fileExists(d.filename + "1.fdq");
} else {
ASSERT(d.storeType == KeyValueStoreType::MEMORY_RADIXTREE);
included = fileExists(d.filename + "1.fdr");
}
if (d.storedComponent == DiskStore::COMPONENT::TLogData && included) {
included = false;
// The previous code assumed that d.filename is a filename. But that is not true.
// d.filename is a path. Removing a prefix and adding a new one just makes a broken
// directory name. So fileExists would always return false.
// Weirdly, this doesn't break anything, as tested by taking a clean check of FDB,
// setting included to false always, and then running correctness. So I'm just
// improving the situation by actually marking it as broken.
// FIXME: this whole thing
/*
std::string logDataBasename;
StringRef filename = d.filename;
if (filename.startsWith(fileLogDataPrefix)) {
logDataBasename = fileLogQueuePrefix.toString() +
d.filename.substr(fileLogDataPrefix.size()); } else { StringRef optionsString =
filename.removePrefix(fileVersionedLogDataPrefix).eat("-"); logDataBasename =
fileLogQueuePrefix.toString() + optionsString.toString() + "-";
}
TraceEvent("DiskStoreRequest").detail("FilenameBasename", logDataBasename);
if (fileExists(logDataBasename + "0.fdq") && fileExists(logDataBasename + "1.fdq")) {
included = true;
}
*/
}
}
if (included) {
ids.push_back(ids.arena(), d.storeID);
}
}
req.reply.send(ids);
}
when(wait(loggingTrigger)) {
systemMonitor();
loggingTrigger = delay(loggingDelay, TaskPriority::FlushTrace);
}
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
Standalone<StringRef> snapFolder = StringRef(folder);
if (snapReq.role.toString() == "coord") {
snapFolder = coordFolder;
}
errorForwarders.add(workerSnapCreate(snapReq, snapFolder));
}
when(wait(errorForwarders.getResult())) {}
when(wait(handleErrors)) {}
}
} catch (Error& err) {
// Make sure actors are cancelled before "recovery" promises are destructed.
for (auto f : recoveries)
f.cancel();
state Error e = err;
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_actor_cancelled ||
e.code() == error_code_please_reboot_delete;
endRole(Role::WORKER, interf.id(), "WorkerError", ok, e);
errorForwarders.clear(false);
sharedLogs.clear();
if (e.code() !=
error_code_actor_cancelled) { // We get cancelled e.g. when an entire simulation times out, but in that case
// we won't be restarted and don't need to wait for shutdown
stopping.send(Void());
wait(filesClosed.getResult()); // Wait for complete shutdown of KV stores
wait(delay(0.0)); // Unwind the callstack to make sure that IAsyncFile references are all gone
TraceEvent(SevInfo, "WorkerShutdownComplete", interf.id());
}
throw e;
}
}