ACTOR Future workerServer()

in fdbserver/worker.actor.cpp [1366:2293]


ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
                                Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
                                LocalityData locality,
                                Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
                                ProcessClass initialClass,
                                std::string folder,
                                int64_t memoryLimit,
                                std::string metricsConnFile,
                                std::string metricsPrefix,
                                Promise<Void> recoveredDiskFiles,
                                int64_t memoryProfileThreshold,
                                std::string _coordFolder,
                                std::string whitelistBinPaths,
                                Reference<AsyncVar<ServerDBInfo>> dbInfo,
                                ConfigDBType configDBType,
                                Reference<LocalConfiguration> localConfig) {
	state PromiseStream<ErrorInfo> errors;
	state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
	    new AsyncVar<Optional<DataDistributorInterface>>());
	state Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf(new AsyncVar<Optional<RatekeeperInterface>>());
	state Reference<AsyncVar<Optional<BlobManagerInterface>>> bmInterf(new AsyncVar<Optional<BlobManagerInterface>>());
	state Reference<AsyncVar<Optional<EncryptKeyProxyInterface>>> ekpInterf(
	    new AsyncVar<Optional<EncryptKeyProxyInterface>>());
	state Future<Void> handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last
	state ActorCollection errorForwarders(false);
	state Future<Void> loggingTrigger = Void();
	state double loggingDelay = SERVER_KNOBS->WORKER_LOGGING_INTERVAL;
	state ActorCollection filesClosed(true);
	state Promise<Void> stopping;
	state WorkerCache<InitializeStorageReply> storageCache;
	state Future<Void> metricsLogger;
	state Future<Void> chaosMetricsActor;
	state Reference<AsyncVar<bool>> degraded = FlowTransport::transport().getDegraded();
	// tLogFnForOptions() can return a function that doesn't correspond with the FDB version that the
	// TLogVersion represents.  This can be done if the newer TLog doesn't support a requested option.
	// As (store type, spill type) can map to the same TLogFn across multiple TLogVersions, we need to
	// decide if we should collapse them into the same SharedTLog instance as well.  The answer
	// here is no, so that when running with log_version==3, all files should say V=3.
	state std::map<SharedLogsKey, SharedLogsValue> sharedLogs;
	state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
	state WorkerCache<InitializeBackupReply> backupWorkerCache;

	state std::string coordFolder = abspath(_coordFolder);

	state WorkerInterface interf(locality);
	state std::set<std::pair<UID, KeyValueStoreType>> runningStorages;
	interf.initEndpoints();

	state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());

	if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
		TraceEvent(SevInfo, "ChaosFeaturesEnabled");
		chaosMetricsActor = chaosMetricsLogger();
	}

	folder = abspath(folder);

	if (metricsPrefix.size() > 0) {
		if (metricsConnFile.size() > 0) {
			try {
				state Database db =
				    Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST, IsInternal::True, locality);
				metricsLogger = runMetrics(db, KeyRef(metricsPrefix));
			} catch (Error& e) {
				TraceEvent(SevWarnAlways, "TDMetricsBadClusterFile").error(e).detail("ConnFile", metricsConnFile);
			}
		} else {
			auto lockAware = metricsPrefix.size() && metricsPrefix[0] == '\xff' ? LockAware::True : LockAware::False;
			metricsLogger =
			    runMetrics(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, lockAware), KeyRef(metricsPrefix));
		}

		GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
	}

	errorForwarders.add(resetAfter(degraded,
	                               SERVER_KNOBS->DEGRADED_RESET_INTERVAL,
	                               false,
	                               SERVER_KNOBS->DEGRADED_WARNING_LIMIT,
	                               SERVER_KNOBS->DEGRADED_WARNING_RESET_DELAY,
	                               "DegradedReset"));
	errorForwarders.add(loadedPonger(interf.debugPing.getFuture()));
	errorForwarders.add(waitFailureServer(interf.waitFailure.getFuture()));
	errorForwarders.add(monitorTraceLogIssues(issues));
	errorForwarders.add(testerServerCore(interf.testerInterface, connRecord, dbInfo, locality));
	errorForwarders.add(monitorHighMemory(memoryProfileThreshold));

	filesClosed.add(stopping.getFuture());

	initializeSystemMonitorMachineState(SystemMonitorMachineState(
	    folder, locality.dcId(), locality.zoneId(), locality.machineId(), g_network->getLocalAddress().ip));

	{
		auto recruited = interf;
		DUMPTOKEN(recruited.clientInterface.reboot);
		DUMPTOKEN(recruited.clientInterface.profiler);
		DUMPTOKEN(recruited.tLog);
		DUMPTOKEN(recruited.master);
		DUMPTOKEN(recruited.commitProxy);
		DUMPTOKEN(recruited.grvProxy);
		DUMPTOKEN(recruited.resolver);
		DUMPTOKEN(recruited.storage);
		DUMPTOKEN(recruited.debugPing);
		DUMPTOKEN(recruited.coordinationPing);
		DUMPTOKEN(recruited.waitFailure);
		DUMPTOKEN(recruited.setMetricsRate);
		DUMPTOKEN(recruited.eventLogRequest);
		DUMPTOKEN(recruited.traceBatchDumpRequest);
		DUMPTOKEN(recruited.updateServerDBInfo);
	}

	state std::vector<Future<Void>> recoveries;

	try {
		std::vector<DiskStore> stores = getDiskStores(folder);
		bool validateDataFiles = deleteFile(joinPath(folder, validationFilename));
		for (int f = 0; f < stores.size(); f++) {
			DiskStore s = stores[f];
			// FIXME: Error handling
			if (s.storedComponent == DiskStore::Storage) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
				IKeyValueStore* kv =
				    openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, false, validateDataFiles);
				Future<Void> kvClosed = kv->onClosed();
				filesClosed.add(kvClosed);

				// std::string doesn't have startsWith
				std::string tssPrefix = testingStoragePrefix.toString();
				// TODO might be more efficient to mark a boolean on DiskStore in getDiskStores, but that kind of breaks
				// the abstraction since DiskStore also applies to storage cache + tlog
				bool isTss = s.filename.find(tssPrefix) != std::string::npos;
				Role ssRole = isTss ? Role::TESTING_STORAGE_SERVER : Role::STORAGE_SERVER;

				StorageServerInterface recruited;
				recruited.uniqueID = s.storeID;
				recruited.locality = locality;
				recruited.tssPairID =
				    isTss ? Optional<UID>(UID())
				          : Optional<UID>(); // presence of optional is used as source of truth for tss vs not. Value
				                             // gets overridden later in restoreDurableState
				recruited.initEndpoints();

				std::map<std::string, std::string> details;
				details["StorageEngine"] = s.storeType.toString();
				details["IsTSS"] = isTss ? "Yes" : "No";

				startRole(ssRole, recruited.id(), interf.id(), details, "Restored");

				DUMPTOKEN(recruited.getValue);
				DUMPTOKEN(recruited.getKey);
				DUMPTOKEN(recruited.getKeyValues);
				DUMPTOKEN(recruited.getShardState);
				DUMPTOKEN(recruited.waitMetrics);
				DUMPTOKEN(recruited.splitMetrics);
				DUMPTOKEN(recruited.getReadHotRanges);
				DUMPTOKEN(recruited.getRangeSplitPoints);
				DUMPTOKEN(recruited.getStorageMetrics);
				DUMPTOKEN(recruited.waitFailure);
				DUMPTOKEN(recruited.getQueuingMetrics);
				DUMPTOKEN(recruited.getKeyValueStoreType);
				DUMPTOKEN(recruited.watchValue);
				DUMPTOKEN(recruited.getKeyValuesStream);
				DUMPTOKEN(recruited.getKeyValuesAndFlatMap);

				Promise<Void> recovery;
				Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connRecord);
				recoveries.push_back(recovery.getFuture());
				f = handleIOErrors(f, kv, s.storeID, kvClosed);
				f = storageServerRollbackRebooter(&runningStorages,
				                                  f,
				                                  s.storeType,
				                                  s.filename,
				                                  recruited.id(),
				                                  recruited.locality,
				                                  isTss,
				                                  dbInfo,
				                                  folder,
				                                  &filesClosed,
				                                  memoryLimit,
				                                  kv);
				errorForwarders.add(forwardError(errors, ssRole, recruited.id(), f));
			} else if (s.storedComponent == DiskStore::TLogData) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::TLog;
				std::string logQueueBasename;
				const std::string filename = basename(s.filename);
				if (StringRef(filename).startsWith(fileLogDataPrefix)) {
					logQueueBasename = fileLogQueuePrefix.toString();
				} else {
					StringRef optionsString = StringRef(filename).removePrefix(fileVersionedLogDataPrefix).eat("-");
					logQueueBasename = fileLogQueuePrefix.toString() + optionsString.toString() + "-";
				}
				ASSERT_WE_THINK(abspath(parentDirectory(s.filename)) == folder);
				IKeyValueStore* kv = openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles);
				const DiskQueueVersion dqv =
				    s.tLogOptions.version >= TLogVersion::V3 ? DiskQueueVersion::V1 : DiskQueueVersion::V0;
				const int64_t diskQueueWarnSize =
				    s.tLogOptions.spillType == TLogSpillType::VALUE ? 10 * SERVER_KNOBS->TARGET_BYTES_PER_TLOG : -1;
				IDiskQueue* queue = openDiskQueue(joinPath(folder, logQueueBasename + s.storeID.toString() + "-"),
				                                  tlogQueueExtension.toString(),
				                                  s.storeID,
				                                  dqv,
				                                  diskQueueWarnSize);
				filesClosed.add(kv->onClosed());
				filesClosed.add(queue->onClosed());

				std::map<std::string, std::string> details;
				details["StorageEngine"] = s.storeType.toString();
				startRole(Role::SHARED_TRANSACTION_LOG, s.storeID, interf.id(), details, "Restored");

				Promise<Void> oldLog;
				Promise<Void> recovery;
				TLogFn tLogFn = tLogFnForOptions(s.tLogOptions);
				auto& logData = sharedLogs[SharedLogsKey(s.tLogOptions, s.storeType)];
				// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
				// be sending a fake InitializeTLogRequest rather than calling tLog() ?
				Future<Void> tl =
				    tLogFn(kv,
				           queue,
				           dbInfo,
				           locality,
				           !logData.actor.isValid() || logData.actor.isReady() ? logData.requests
				                                                               : PromiseStream<InitializeTLogRequest>(),
				           s.storeID,
				           interf.id(),
				           true,
				           oldLog,
				           recovery,
				           folder,
				           degraded,
				           activeSharedTLog);
				recoveries.push_back(recovery.getFuture());
				activeSharedTLog->set(s.storeID);

				tl = handleIOErrors(tl, kv, s.storeID);
				tl = handleIOErrors(tl, queue, s.storeID);
				if (!logData.actor.isValid() || logData.actor.isReady()) {
					logData.actor = oldLog.getFuture() || tl;
					logData.uid = s.storeID;
				}
				errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl));
			}
		}

		bool hasCache = false;
		//  start cache role if we have the right process class
		if (initialClass.classType() == ProcessClass::StorageCacheClass) {
			hasCache = true;
			StorageServerInterface recruited;
			recruited.locality = locality;
			recruited.initEndpoints();

			std::map<std::string, std::string> details;
			startRole(Role::STORAGE_CACHE, recruited.id(), interf.id(), details);

			// DUMPTOKEN(recruited.getVersion);
			DUMPTOKEN(recruited.getValue);
			DUMPTOKEN(recruited.getKey);
			DUMPTOKEN(recruited.getKeyValues);
			DUMPTOKEN(recruited.getKeyValuesAndFlatMap);
			DUMPTOKEN(recruited.getShardState);
			DUMPTOKEN(recruited.waitMetrics);
			DUMPTOKEN(recruited.splitMetrics);
			DUMPTOKEN(recruited.getStorageMetrics);
			DUMPTOKEN(recruited.waitFailure);
			DUMPTOKEN(recruited.getQueuingMetrics);
			DUMPTOKEN(recruited.getKeyValueStoreType);
			DUMPTOKEN(recruited.watchValue);

			auto f = storageCacheServer(recruited, 0, dbInfo);
			f = storageCacheRollbackRebooter(f, recruited.id(), recruited.locality, dbInfo);
			errorForwarders.add(forwardError(errors, Role::STORAGE_CACHE, recruited.id(), f));
		}

		std::map<std::string, std::string> details;
		details["Locality"] = locality.toString();
		details["DataFolder"] = folder;
		details["StoresPresent"] = format("%d", stores.size());
		details["CachePresent"] = hasCache ? "true" : "false";
		startRole(Role::WORKER, interf.id(), interf.id(), details);
		errorForwarders.add(traceRole(Role::WORKER, interf.id()));

		wait(waitForAll(recoveries));
		recoveredDiskFiles.send(Void());

		errorForwarders.add(registrationClient(ccInterface,
		                                       interf,
		                                       asyncPriorityInfo,
		                                       initialClass,
		                                       ddInterf,
		                                       rkInterf,
		                                       bmInterf,
		                                       ekpInterf,
		                                       degraded,
		                                       connRecord,
		                                       issues,
		                                       localConfig,
		                                       dbInfo));

		if (configDBType != ConfigDBType::DISABLED) {
			errorForwarders.add(localConfig->consume(interf.configBroadcastInterface));
		}

		if (SERVER_KNOBS->ENABLE_WORKER_HEALTH_MONITOR) {
			errorForwarders.add(healthMonitor(ccInterface, interf, locality, dbInfo));
		}

		TraceEvent("RecoveriesComplete", interf.id());

		loop choose {
			when(UpdateServerDBInfoRequest req = waitNext(interf.updateServerDBInfo.getFuture())) {
				ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(
				    req.serializedDbInfo, AssumeVersion(g_network->protocolVersion()));
				localInfo.myLocality = locality;

				if (localInfo.infoGeneration < dbInfo->get().infoGeneration &&
				    localInfo.clusterInterface == dbInfo->get().clusterInterface) {
					std::vector<Endpoint> rep = req.broadcastInfo;
					rep.push_back(interf.updateServerDBInfo.getEndpoint());
					req.reply.send(rep);
				} else {
					Optional<Endpoint> notUpdated;
					if (!ccInterface->get().present() || localInfo.clusterInterface != ccInterface->get().get()) {
						notUpdated = interf.updateServerDBInfo.getEndpoint();
					} else if (localInfo.infoGeneration > dbInfo->get().infoGeneration ||
					           dbInfo->get().clusterInterface != ccInterface->get().get()) {
						TraceEvent("GotServerDBInfoChange")
						    .detail("ChangeID", localInfo.id)
						    .detail("InfoGeneration", localInfo.infoGeneration)
						    .detail("MasterID", localInfo.master.id())
						    .detail("RatekeeperID",
						            localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
						    .detail("DataDistributorID",
						            localInfo.distributor.present() ? localInfo.distributor.get().id() : UID())
						    .detail("BlobManagerID",
						            localInfo.blobManager.present() ? localInfo.blobManager.get().id() : UID())
						    .detail("EncryptKeyProxyID",
						            localInfo.encryptKeyProxy.present() ? localInfo.encryptKeyProxy.get().id() : UID());

						dbInfo->set(localInfo);
					}
					errorForwarders.add(
					    success(broadcastDBInfoRequest(req, SERVER_KNOBS->DBINFO_SEND_AMOUNT, notUpdated, true)));
				}
			}
			when(RebootRequest req = waitNext(interf.clientInterface.reboot.getFuture())) {
				state RebootRequest rebootReq = req;
				// If suspendDuration is INT_MAX, the trace will not be logged if it was inside the next block
				// Also a useful trace to have even if suspendDuration is 0
				TraceEvent("RebootRequestSuspendingProcess").detail("Duration", req.waitForDuration);
				if (req.waitForDuration) {
					flushTraceFileVoid();
					setProfilingEnabled(0);
					g_network->stop();
					threadSleep(req.waitForDuration);
				}
				if (rebootReq.checkData) {
					Reference<IAsyncFile> checkFile =
					    wait(IAsyncFileSystem::filesystem()->open(joinPath(folder, validationFilename),
					                                              IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE,
					                                              0600));
					wait(checkFile->sync());
				}

				if (g_network->isSimulated()) {
					TraceEvent("SimulatedReboot").detail("Deletion", rebootReq.deleteData);
					if (rebootReq.deleteData) {
						throw please_reboot_delete();
					}
					throw please_reboot();
				} else {
					TraceEvent("ProcessReboot").log();
					ASSERT(!rebootReq.deleteData);
					flushAndExit(0);
				}
			}
			when(SetFailureInjection req = waitNext(interf.clientInterface.setFailureInjection.getFuture())) {
				if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
					if (req.diskFailure.present()) {
						auto diskFailureInjector = DiskFailureInjector::injector();
						diskFailureInjector->setDiskFailure(req.diskFailure.get().stallInterval,
						                                    req.diskFailure.get().stallPeriod,
						                                    req.diskFailure.get().throttlePeriod);
					} else if (req.flipBits.present()) {
						auto bitFlipper = BitFlipper::flipper();
						bitFlipper->setBitFlipPercentage(req.flipBits.get().percentBitFlips);
					}
					req.reply.send(Void());
				} else {
					req.reply.sendError(client_invalid_operation());
				}
			}
			when(ProfilerRequest req = waitNext(interf.clientInterface.profiler.getFuture())) {
				state ProfilerRequest profilerReq = req;
				// There really isn't a great "filepath sanitizer" or "filepath escape" function available,
				// thus we instead enforce a different requirement.  One can only write to a file that's
				// beneath the working directory, and we remove the ability to do any symlink or ../..
				// tricks by resolving all paths through `abspath` first.
				try {
					std::string realLogDir = abspath(SERVER_KNOBS->LOG_DIRECTORY);
					std::string realOutPath = abspath(realLogDir + "/" + profilerReq.outputFile.toString());
					if (realLogDir.size() < realOutPath.size() &&
					    strncmp(realLogDir.c_str(), realOutPath.c_str(), realLogDir.size()) == 0) {
						profilerReq.outputFile = realOutPath;
						uncancellable(runProfiler(profilerReq));
						profilerReq.reply.send(Void());
					} else {
						profilerReq.reply.sendError(client_invalid_operation());
					}
				} catch (Error& e) {
					profilerReq.reply.sendError(e);
				}
			}
			when(RecruitMasterRequest req = waitNext(interf.master.getFuture())) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Master;
				MasterInterface recruited;
				recruited.locality = locality;
				recruited.initEndpoints();

				startRole(Role::MASTER, recruited.id(), interf.id());

				DUMPTOKEN(recruited.waitFailure);
				DUMPTOKEN(recruited.getCommitVersion);
				DUMPTOKEN(recruited.getLiveCommittedVersion);
				DUMPTOKEN(recruited.reportLiveCommittedVersion);
				DUMPTOKEN(recruited.updateRecoveryData);

				// printf("Recruited as masterServer\n");
				Future<Void> masterProcess = masterServer(
				    recruited, dbInfo, ccInterface, ServerCoordinators(connRecord), req.lifetime, req.forceRecovery);
				errorForwarders.add(
				    zombie(recruited, forwardError(errors, Role::MASTER, recruited.id(), masterProcess)));
				req.reply.send(recruited);
			}
			when(InitializeDataDistributorRequest req = waitNext(interf.dataDistributor.getFuture())) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::DataDistributor;
				DataDistributorInterface recruited(locality, req.reqId);
				recruited.initEndpoints();

				if (ddInterf->get().present()) {
					recruited = ddInterf->get().get();
					TEST(true); // Recruited while already a data distributor.
				} else {
					startRole(Role::DATA_DISTRIBUTOR, recruited.id(), interf.id());
					DUMPTOKEN(recruited.waitFailure);

					Future<Void> dataDistributorProcess = dataDistributor(recruited, dbInfo);
					errorForwarders.add(forwardError(
					    errors,
					    Role::DATA_DISTRIBUTOR,
					    recruited.id(),
					    setWhenDoneOrError(dataDistributorProcess, ddInterf, Optional<DataDistributorInterface>())));
					ddInterf->set(Optional<DataDistributorInterface>(recruited));
				}
				TraceEvent("DataDistributorReceived", req.reqId).detail("DataDistributorId", recruited.id());
				req.reply.send(recruited);
			}
			when(InitializeRatekeeperRequest req = waitNext(interf.ratekeeper.getFuture())) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Ratekeeper;
				RatekeeperInterface recruited(locality, req.reqId);
				recruited.initEndpoints();

				if (rkInterf->get().present()) {
					recruited = rkInterf->get().get();
					TEST(true); // Recruited while already a ratekeeper.
				} else {
					startRole(Role::RATEKEEPER, recruited.id(), interf.id());
					DUMPTOKEN(recruited.waitFailure);
					DUMPTOKEN(recruited.getRateInfo);
					DUMPTOKEN(recruited.haltRatekeeper);
					DUMPTOKEN(recruited.reportCommitCostEstimation);

					Future<Void> ratekeeperProcess = ratekeeper(recruited, dbInfo);
					errorForwarders.add(
					    forwardError(errors,
					                 Role::RATEKEEPER,
					                 recruited.id(),
					                 setWhenDoneOrError(ratekeeperProcess, rkInterf, Optional<RatekeeperInterface>())));
					rkInterf->set(Optional<RatekeeperInterface>(recruited));
				}
				TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id());
				req.reply.send(recruited);
			}
			when(InitializeBlobManagerRequest req = waitNext(interf.blobManager.getFuture())) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobManager;
				BlobManagerInterface recruited(locality, req.reqId);
				recruited.initEndpoints();

				if (bmInterf->get().present()) {
					recruited = bmInterf->get().get();
					TEST(true); // Recruited while already a blob manager.
				} else {
					startRole(Role::BLOB_MANAGER, recruited.id(), interf.id());
					DUMPTOKEN(recruited.waitFailure);
					DUMPTOKEN(recruited.haltBlobManager);

					Future<Void> blobManagerProcess = blobManager(recruited, dbInfo, req.epoch);
					errorForwarders.add(forwardError(
					    errors,
					    Role::BLOB_MANAGER,
					    recruited.id(),
					    setWhenDoneOrError(blobManagerProcess, bmInterf, Optional<BlobManagerInterface>())));
					bmInterf->set(Optional<BlobManagerInterface>(recruited));
				}
				TraceEvent("BlobManagerReceived", req.reqId).detail("BlobManagerId", recruited.id());
				req.reply.send(recruited);
			}
			when(InitializeBackupRequest req = waitNext(interf.backup.getFuture())) {
				if (!backupWorkerCache.exists(req.reqId)) {
					LocalLineage _;
					getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Backup;
					BackupInterface recruited(locality);
					recruited.initEndpoints();

					startRole(Role::BACKUP, recruited.id(), interf.id());
					DUMPTOKEN(recruited.waitFailure);

					ReplyPromise<InitializeBackupReply> backupReady = req.reply;
					backupWorkerCache.set(req.reqId, backupReady.getFuture());
					Future<Void> backupProcess = backupWorker(recruited, req, dbInfo);
					backupProcess = storageCache.removeOnReady(req.reqId, backupProcess);
					errorForwarders.add(forwardError(errors, Role::BACKUP, recruited.id(), backupProcess));
					TraceEvent("BackupInitRequest", req.reqId).detail("BackupId", recruited.id());
					InitializeBackupReply reply(recruited, req.backupEpoch);
					backupReady.send(reply);
				} else {
					forwardPromise(req.reply, backupWorkerCache.get(req.reqId));
				}
			}
			when(InitializeEncryptKeyProxyRequest req = waitNext(interf.encryptKeyProxy.getFuture())) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::EncryptKeyProxy;
				EncryptKeyProxyInterface recruited(locality, req.reqId);
				recruited.initEndpoints();

				if (ekpInterf->get().present()) {
					recruited = ekpInterf->get().get();
					TEST(true); // Recruited while already a encryptKeyProxy server.
				} else {
					startRole(Role::ENCRYPT_KEY_PROXY, recruited.id(), interf.id());
					DUMPTOKEN(recruited.waitFailure);

					Future<Void> encryptKeyProxyProcess = encryptKeyProxyServer(recruited, dbInfo);
					errorForwarders.add(forwardError(
					    errors,
					    Role::ENCRYPT_KEY_PROXY,
					    recruited.id(),
					    setWhenDoneOrError(encryptKeyProxyProcess, ekpInterf, Optional<EncryptKeyProxyInterface>())));
					ekpInterf->set(Optional<EncryptKeyProxyInterface>(recruited));
				}
				TraceEvent("EncryptKeyProxyReceived", req.reqId).detail("EncryptKeyProxyId", recruited.id());
				req.reply.send(recruited);
			}
			when(InitializeTLogRequest req = waitNext(interf.tLog.getFuture())) {
				// For now, there's a one-to-one mapping of spill type to TLogVersion.
				// With future work, a particular version of the TLog can support multiple
				// different spilling strategies, at which point SpillType will need to be
				// plumbed down into tLogFn.
				if (req.logVersion < TLogVersion::MIN_RECRUITABLE) {
					TraceEvent(SevError, "InitializeTLogInvalidLogVersion")
					    .detail("Version", req.logVersion)
					    .detail("MinRecruitable", TLogVersion::MIN_RECRUITABLE);
					req.reply.sendError(internal_error());
				}
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::TLog;
				TLogOptions tLogOptions(req.logVersion, req.spillType);
				TLogFn tLogFn = tLogFnForOptions(tLogOptions);
				auto& logData = sharedLogs[SharedLogsKey(tLogOptions, req.storeType)];
				logData.requests.send(req);
				if (!logData.actor.isValid() || logData.actor.isReady()) {
					UID logId = deterministicRandom()->randomUniqueID();
					std::map<std::string, std::string> details;
					details["ForMaster"] = req.recruitmentID.shortString();
					details["StorageEngine"] = req.storeType.toString();

					// FIXME: start role for every tlog instance, rather that just for the shared actor, also use a
					// different role type for the shared actor
					startRole(Role::SHARED_TRANSACTION_LOG, logId, interf.id(), details);

					const StringRef prefix =
					    req.logVersion > TLogVersion::V2 ? fileVersionedLogDataPrefix : fileLogDataPrefix;
					std::string filename =
					    filenameFromId(req.storeType, folder, prefix.toString() + tLogOptions.toPrefix(), logId);
					IKeyValueStore* data = openKVStore(req.storeType, filename, logId, memoryLimit);
					const DiskQueueVersion dqv =
					    tLogOptions.version >= TLogVersion::V3 ? DiskQueueVersion::V1 : DiskQueueVersion::V0;
					IDiskQueue* queue = openDiskQueue(
					    joinPath(folder,
					             fileLogQueuePrefix.toString() + tLogOptions.toPrefix() + logId.toString() + "-"),
					    tlogQueueExtension.toString(),
					    logId,
					    dqv);
					filesClosed.add(data->onClosed());
					filesClosed.add(queue->onClosed());

					Future<Void> tLogCore = tLogFn(data,
					                               queue,
					                               dbInfo,
					                               locality,
					                               logData.requests,
					                               logId,
					                               interf.id(),
					                               false,
					                               Promise<Void>(),
					                               Promise<Void>(),
					                               folder,
					                               degraded,
					                               activeSharedTLog);
					tLogCore = handleIOErrors(tLogCore, data, logId);
					tLogCore = handleIOErrors(tLogCore, queue, logId);
					errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore));
					logData.actor = tLogCore;
					logData.uid = logId;
				}
				activeSharedTLog->set(logData.uid);
			}
			when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) {
				// We want to prevent double recruiting on a worker unless we try to recruit something
				// with a different storage engine (otherwise storage migration won't work for certain
				// configuration). Additionally we also need to allow double recruitment for seed servers.
				// The reason for this is that a storage will only remove itself if after it was able
				// to read the system key space. But if recovery fails right after a `configure new ...`
				// was run it won't be able to do so.
				if (!storageCache.exists(req.reqId) &&
				    (std::all_of(runningStorages.begin(),
				                 runningStorages.end(),
				                 [&req](const auto& p) { return p.second != req.storeType; }) ||
				     req.seedTag != invalidTag)) {
					ASSERT(req.clusterId.isValid());
					LocalLineage _;
					getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
					bool isTss = req.tssPairIDAndVersion.present();
					StorageServerInterface recruited(req.interfaceId);
					recruited.locality = locality;
					recruited.tssPairID = isTss ? req.tssPairIDAndVersion.get().first : Optional<UID>();
					recruited.initEndpoints();

					std::map<std::string, std::string> details;
					details["StorageEngine"] = req.storeType.toString();
					details["IsTSS"] = std::to_string(isTss);
					Role ssRole = isTss ? Role::TESTING_STORAGE_SERVER : Role::STORAGE_SERVER;
					startRole(ssRole, recruited.id(), interf.id(), details);

					DUMPTOKEN(recruited.getValue);
					DUMPTOKEN(recruited.getKey);
					DUMPTOKEN(recruited.getKeyValues);
					DUMPTOKEN(recruited.getShardState);
					DUMPTOKEN(recruited.waitMetrics);
					DUMPTOKEN(recruited.splitMetrics);
					DUMPTOKEN(recruited.getReadHotRanges);
					DUMPTOKEN(recruited.getRangeSplitPoints);
					DUMPTOKEN(recruited.getStorageMetrics);
					DUMPTOKEN(recruited.waitFailure);
					DUMPTOKEN(recruited.getQueuingMetrics);
					DUMPTOKEN(recruited.getKeyValueStoreType);
					DUMPTOKEN(recruited.watchValue);
					DUMPTOKEN(recruited.getKeyValuesStream);
					DUMPTOKEN(recruited.getKeyValuesAndFlatMap);
					// printf("Recruited as storageServer\n");

					std::string filename =
					    filenameFromId(req.storeType,
					                   folder,
					                   isTss ? testingStoragePrefix.toString() : fileStoragePrefix.toString(),
					                   recruited.id());
					IKeyValueStore* data = openKVStore(req.storeType, filename, recruited.id(), memoryLimit);
					Future<Void> kvClosed = data->onClosed();
					filesClosed.add(kvClosed);
					ReplyPromise<InitializeStorageReply> storageReady = req.reply;
					storageCache.set(req.reqId, storageReady.getFuture());
					Future<Void> s = storageServer(data,
					                               recruited,
					                               req.seedTag,
					                               req.clusterId,
					                               isTss ? req.tssPairIDAndVersion.get().second : 0,
					                               storageReady,
					                               dbInfo,
					                               folder);
					s = handleIOErrors(s, data, recruited.id(), kvClosed);
					s = storageCache.removeOnReady(req.reqId, s);
					s = storageServerRollbackRebooter(&runningStorages,
					                                  s,
					                                  req.storeType,
					                                  filename,
					                                  recruited.id(),
					                                  recruited.locality,
					                                  isTss,
					                                  dbInfo,
					                                  folder,
					                                  &filesClosed,
					                                  memoryLimit,
					                                  data);
					errorForwarders.add(forwardError(errors, ssRole, recruited.id(), s));
				} else if (storageCache.exists(req.reqId)) {
					forwardPromise(req.reply, storageCache.get(req.reqId));
				} else {
					TraceEvent("AttemptedDoubleRecruitment", interf.id()).detail("ForRole", "StorageServer");
					errorForwarders.add(map(delay(0.5), [reply = req.reply](Void) {
						reply.sendError(recruitment_failed());
						return Void();
					}));
				}
			}
			when(InitializeBlobWorkerRequest req = waitNext(interf.blobWorker.getFuture())) {
				BlobWorkerInterface recruited(locality, req.interfaceId);
				recruited.initEndpoints();
				startRole(Role::BLOB_WORKER, recruited.id(), interf.id());

				ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady = req.reply;
				Future<Void> bw = blobWorker(recruited, blobWorkerReady, dbInfo);
				errorForwarders.add(forwardError(errors, Role::BLOB_WORKER, recruited.id(), bw));
			}
			when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::CommitProxy;
				CommitProxyInterface recruited;
				recruited.processId = locality.processId();
				recruited.provisional = false;
				recruited.initEndpoints();

				std::map<std::string, std::string> details;
				details["ForMaster"] = req.master.id().shortString();
				startRole(Role::COMMIT_PROXY, recruited.id(), interf.id(), details);

				DUMPTOKEN(recruited.commit);
				DUMPTOKEN(recruited.getConsistentReadVersion);
				DUMPTOKEN(recruited.getKeyServersLocations);
				DUMPTOKEN(recruited.getStorageServerRejoinInfo);
				DUMPTOKEN(recruited.waitFailure);
				DUMPTOKEN(recruited.txnState);

				// printf("Recruited as commitProxyServer\n");
				errorForwarders.add(zombie(recruited,
				                           forwardError(errors,
				                                        Role::COMMIT_PROXY,
				                                        recruited.id(),
				                                        commitProxyServer(recruited, req, dbInfo, whitelistBinPaths))));
				req.reply.send(recruited);
			}
			when(InitializeGrvProxyRequest req = waitNext(interf.grvProxy.getFuture())) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::GrvProxy;
				GrvProxyInterface recruited;
				recruited.processId = locality.processId();
				recruited.provisional = false;
				recruited.initEndpoints();

				std::map<std::string, std::string> details;
				details["ForMaster"] = req.master.id().shortString();
				startRole(Role::GRV_PROXY, recruited.id(), interf.id(), details);

				DUMPTOKEN(recruited.getConsistentReadVersion);
				DUMPTOKEN(recruited.waitFailure);
				DUMPTOKEN(recruited.getHealthMetrics);

				// printf("Recruited as grvProxyServer\n");
				errorForwarders.add(zombie(
				    recruited,
				    forwardError(errors, Role::GRV_PROXY, recruited.id(), grvProxyServer(recruited, req, dbInfo))));
				req.reply.send(recruited);
			}
			when(InitializeResolverRequest req = waitNext(interf.resolver.getFuture())) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Resolver;
				ResolverInterface recruited;
				recruited.locality = locality;
				recruited.initEndpoints();

				std::map<std::string, std::string> details;
				startRole(Role::RESOLVER, recruited.id(), interf.id(), details);

				DUMPTOKEN(recruited.resolve);
				DUMPTOKEN(recruited.metrics);
				DUMPTOKEN(recruited.split);
				DUMPTOKEN(recruited.waitFailure);

				errorForwarders.add(zombie(
				    recruited, forwardError(errors, Role::RESOLVER, recruited.id(), resolver(recruited, req, dbInfo))));
				req.reply.send(recruited);
			}
			when(InitializeLogRouterRequest req = waitNext(interf.logRouter.getFuture())) {
				LocalLineage _;
				getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::LogRouter;
				TLogInterface recruited(locality);
				recruited.initEndpoints();

				std::map<std::string, std::string> details;
				startRole(Role::LOG_ROUTER, recruited.id(), interf.id(), details);

				DUMPTOKEN(recruited.peekMessages);
				DUMPTOKEN(recruited.peekStreamMessages);
				DUMPTOKEN(recruited.popMessages);
				DUMPTOKEN(recruited.commit);
				DUMPTOKEN(recruited.lock);
				DUMPTOKEN(recruited.getQueuingMetrics);
				DUMPTOKEN(recruited.confirmRunning);
				DUMPTOKEN(recruited.waitFailure);
				DUMPTOKEN(recruited.recoveryFinished);
				DUMPTOKEN(recruited.disablePopRequest);
				DUMPTOKEN(recruited.enablePopRequest);
				DUMPTOKEN(recruited.snapRequest);

				errorForwarders.add(
				    zombie(recruited,
				           forwardError(errors, Role::LOG_ROUTER, recruited.id(), logRouter(recruited, req, dbInfo))));
				req.reply.send(recruited);
			}
			when(CoordinationPingMessage m = waitNext(interf.coordinationPing.getFuture())) {
				TraceEvent("CoordinationPing", interf.id())
				    .detail("CCID", m.clusterControllerId)
				    .detail("TimeStep", m.timeStep);
			}
			when(SetMetricsLogRateRequest req = waitNext(interf.setMetricsRate.getFuture())) {
				TraceEvent("LoggingRateChange", interf.id())
				    .detail("OldDelay", loggingDelay)
				    .detail("NewLogPS", req.metricsLogsPerSecond);
				if (req.metricsLogsPerSecond != 0) {
					loggingDelay = 1.0 / req.metricsLogsPerSecond;
					loggingTrigger = Void();
				}
			}
			when(EventLogRequest req = waitNext(interf.eventLogRequest.getFuture())) {
				TraceEventFields e;
				if (req.getLastError)
					e = latestEventCache.getLatestError();
				else
					e = latestEventCache.get(req.eventName.toString());
				req.reply.send(e);
			}
			when(TraceBatchDumpRequest req = waitNext(interf.traceBatchDumpRequest.getFuture())) {
				g_traceBatch.dump();
				req.reply.send(Void());
			}
			when(DiskStoreRequest req = waitNext(interf.diskStoreRequest.getFuture())) {
				Standalone<VectorRef<UID>> ids;
				for (DiskStore d : getDiskStores(folder)) {
					bool included = true;
					if (!req.includePartialStores) {
						if (d.storeType == KeyValueStoreType::SSD_BTREE_V1) {
							included = fileExists(d.filename + ".fdb-wal");
						} else if (d.storeType == KeyValueStoreType::SSD_BTREE_V2) {
							included = fileExists(d.filename + ".sqlite-wal");
						} else if (d.storeType == KeyValueStoreType::SSD_REDWOOD_V1) {
							included = fileExists(d.filename + "0.pagerlog") && fileExists(d.filename + "1.pagerlog");
						} else if (d.storeType == KeyValueStoreType::SSD_ROCKSDB_V1) {
							included = fileExists(joinPath(d.filename, "CURRENT")) &&
							           fileExists(joinPath(d.filename, "IDENTITY"));
						} else if (d.storeType == KeyValueStoreType::MEMORY) {
							included = fileExists(d.filename + "1.fdq");
						} else {
							ASSERT(d.storeType == KeyValueStoreType::MEMORY_RADIXTREE);
							included = fileExists(d.filename + "1.fdr");
						}
						if (d.storedComponent == DiskStore::COMPONENT::TLogData && included) {
							included = false;
							// The previous code assumed that d.filename is a filename.  But that is not true.
							// d.filename is a path. Removing a prefix and adding a new one just makes a broken
							// directory name.  So fileExists would always return false.
							// Weirdly, this doesn't break anything, as tested by taking a clean check of FDB,
							// setting included to false always, and then running correctness.  So I'm just
							// improving the situation by actually marking it as broken.
							// FIXME: this whole thing
							/*
							std::string logDataBasename;
							StringRef filename = d.filename;
							if (filename.startsWith(fileLogDataPrefix)) {
							    logDataBasename = fileLogQueuePrefix.toString() +
							d.filename.substr(fileLogDataPrefix.size()); } else { StringRef optionsString =
							filename.removePrefix(fileVersionedLogDataPrefix).eat("-"); logDataBasename =
							fileLogQueuePrefix.toString() + optionsString.toString() + "-";
							}
							TraceEvent("DiskStoreRequest").detail("FilenameBasename", logDataBasename);
							if (fileExists(logDataBasename + "0.fdq") && fileExists(logDataBasename + "1.fdq")) {
							    included = true;
							}
							*/
						}
					}
					if (included) {
						ids.push_back(ids.arena(), d.storeID);
					}
				}
				req.reply.send(ids);
			}
			when(wait(loggingTrigger)) {
				systemMonitor();
				loggingTrigger = delay(loggingDelay, TaskPriority::FlushTrace);
			}
			when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
				Standalone<StringRef> snapFolder = StringRef(folder);
				if (snapReq.role.toString() == "coord") {
					snapFolder = coordFolder;
				}
				errorForwarders.add(workerSnapCreate(snapReq, snapFolder));
			}
			when(wait(errorForwarders.getResult())) {}
			when(wait(handleErrors)) {}
		}
	} catch (Error& err) {
		// Make sure actors are cancelled before "recovery" promises are destructed.
		for (auto f : recoveries)
			f.cancel();
		state Error e = err;
		bool ok = e.code() == error_code_please_reboot || e.code() == error_code_actor_cancelled ||
		          e.code() == error_code_please_reboot_delete;

		endRole(Role::WORKER, interf.id(), "WorkerError", ok, e);
		errorForwarders.clear(false);
		sharedLogs.clear();

		if (e.code() !=
		    error_code_actor_cancelled) { // We get cancelled e.g. when an entire simulation times out, but in that case
			                              // we won't be restarted and don't need to wait for shutdown
			stopping.send(Void());
			wait(filesClosed.getResult()); // Wait for complete shutdown of KV stores
			wait(delay(0.0)); // Unwind the callstack to make sure that IAsyncFile references are all gone
			TraceEvent(SevInfo, "WorkerShutdownComplete", interf.id());
		}

		throw e;
	}
}