in fdbserver/ClusterController.actor.h [2322:2821]
bool betterMasterExists() {
const ServerDBInfo dbi = db.serverInfo->get();
if (dbi.recoveryState < RecoveryState::ACCEPTING_COMMITS) {
return false;
}
// Do not trigger better master exists if the cluster controller is excluded, since the master will change
// anyways once the cluster controller is moved
if (id_worker[clusterControllerProcessId].priorityInfo.isExcluded) {
TraceEvent("NewRecruitmentIsWorse", id).detail("Reason", "ClusterControllerExcluded");
return false;
}
if (db.config.regions.size() > 1 && db.config.regions[0].priority > db.config.regions[1].priority &&
db.config.regions[0].dcId != clusterControllerDcId.get() && versionDifferenceUpdated &&
datacenterVersionDifference < SERVER_KNOBS->MAX_VERSION_DIFFERENCE && remoteDCIsHealthy()) {
checkRegions(db.config.regions);
}
// Get master process
auto masterWorker = id_worker.find(dbi.master.locality.processId());
if (masterWorker == id_worker.end()) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("Reason", "CannotFindMaster")
.detail("ProcessID", dbi.master.locality.processId());
return false;
}
// Get tlog processes
std::vector<WorkerDetails> tlogs;
std::vector<WorkerDetails> remote_tlogs;
std::vector<WorkerDetails> satellite_tlogs;
std::vector<WorkerDetails> log_routers;
std::set<NetworkAddress> logRouterAddresses;
std::vector<WorkerDetails> backup_workers;
std::set<NetworkAddress> backup_addresses;
for (auto& logSet : dbi.logSystemConfig.tLogs) {
for (auto& it : logSet.tLogs) {
auto tlogWorker = id_worker.find(it.interf().filteredLocality.processId());
if (tlogWorker == id_worker.end()) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("Reason", "CannotFindTLog")
.detail("ProcessID", it.interf().filteredLocality.processId());
return false;
}
if (tlogWorker->second.priorityInfo.isExcluded) {
TraceEvent("BetterMasterExists", id)
.detail("Reason", "TLogExcluded")
.detail("ProcessID", it.interf().filteredLocality.processId());
return true;
}
if (logSet.isLocal && logSet.locality == tagLocalitySatellite) {
satellite_tlogs.push_back(tlogWorker->second.details);
} else if (logSet.isLocal) {
tlogs.push_back(tlogWorker->second.details);
} else {
remote_tlogs.push_back(tlogWorker->second.details);
}
}
for (auto& it : logSet.logRouters) {
auto tlogWorker = id_worker.find(it.interf().filteredLocality.processId());
if (tlogWorker == id_worker.end()) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("Reason", "CannotFindLogRouter")
.detail("ProcessID", it.interf().filteredLocality.processId());
return false;
}
if (tlogWorker->second.priorityInfo.isExcluded) {
TraceEvent("BetterMasterExists", id)
.detail("Reason", "LogRouterExcluded")
.detail("ProcessID", it.interf().filteredLocality.processId());
return true;
}
if (!logRouterAddresses.count(tlogWorker->second.details.interf.address())) {
logRouterAddresses.insert(tlogWorker->second.details.interf.address());
log_routers.push_back(tlogWorker->second.details);
}
}
for (const auto& worker : logSet.backupWorkers) {
auto workerIt = id_worker.find(worker.interf().locality.processId());
if (workerIt == id_worker.end()) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("Reason", "CannotFindBackupWorker")
.detail("ProcessID", worker.interf().locality.processId());
return false;
}
if (workerIt->second.priorityInfo.isExcluded) {
TraceEvent("BetterMasterExists", id)
.detail("Reason", "BackupWorkerExcluded")
.detail("ProcessID", worker.interf().locality.processId());
return true;
}
if (backup_addresses.count(workerIt->second.details.interf.address()) == 0) {
backup_addresses.insert(workerIt->second.details.interf.address());
backup_workers.push_back(workerIt->second.details);
}
}
}
// Get commit proxy classes
std::vector<WorkerDetails> commitProxyClasses;
for (auto& it : dbi.client.commitProxies) {
auto commitProxyWorker = id_worker.find(it.processId);
if (commitProxyWorker == id_worker.end()) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("Reason", "CannotFindCommitProxy")
.detail("ProcessID", it.processId);
return false;
}
if (commitProxyWorker->second.priorityInfo.isExcluded) {
TraceEvent("BetterMasterExists", id)
.detail("Reason", "CommitProxyExcluded")
.detail("ProcessID", it.processId);
return true;
}
commitProxyClasses.push_back(commitProxyWorker->second.details);
}
// Get grv proxy classes
std::vector<WorkerDetails> grvProxyClasses;
for (auto& it : dbi.client.grvProxies) {
auto grvProxyWorker = id_worker.find(it.processId);
if (grvProxyWorker == id_worker.end()) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("Reason", "CannotFindGrvProxy")
.detail("ProcessID", it.processId);
return false;
}
if (grvProxyWorker->second.priorityInfo.isExcluded) {
TraceEvent("BetterMasterExists", id)
.detail("Reason", "GrvProxyExcluded")
.detail("ProcessID", it.processId);
return true;
}
grvProxyClasses.push_back(grvProxyWorker->second.details);
}
// Get resolver classes
std::vector<WorkerDetails> resolverClasses;
for (auto& it : dbi.resolvers) {
auto resolverWorker = id_worker.find(it.locality.processId());
if (resolverWorker == id_worker.end()) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("Reason", "CannotFindResolver")
.detail("ProcessID", it.locality.processId());
return false;
}
if (resolverWorker->second.priorityInfo.isExcluded) {
TraceEvent("BetterMasterExists", id)
.detail("Reason", "ResolverExcluded")
.detail("ProcessID", it.locality.processId());
return true;
}
resolverClasses.push_back(resolverWorker->second.details);
}
// Check master fitness. Don't return false if master is excluded in case all the processes are excluded, we
// still need master for recovery.
ProcessClass::Fitness oldMasterFit =
masterWorker->second.details.processClass.machineClassFitness(ProcessClass::Master);
if (db.config.isExcludedServer(dbi.master.addresses())) {
oldMasterFit = std::max(oldMasterFit, ProcessClass::ExcludeFit);
}
std::map<Optional<Standalone<StringRef>>, int> id_used;
std::map<Optional<Standalone<StringRef>>, int> old_id_used;
id_used[clusterControllerProcessId]++;
old_id_used[clusterControllerProcessId]++;
WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(
clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, {}, true);
auto newMasterFit = mworker.worker.processClass.machineClassFitness(ProcessClass::Master);
if (db.config.isExcludedServer(mworker.worker.interf.addresses())) {
newMasterFit = std::max(newMasterFit, ProcessClass::ExcludeFit);
}
old_id_used[masterWorker->first]++;
if (oldMasterFit < newMasterFit) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("OldMasterFit", oldMasterFit)
.detail("NewMasterFit", newMasterFit)
.detail("OldIsCC", dbi.master.locality.processId() == clusterControllerProcessId)
.detail("NewIsCC", mworker.worker.interf.locality.processId() == clusterControllerProcessId);
;
return false;
}
if (oldMasterFit > newMasterFit || (dbi.master.locality.processId() == clusterControllerProcessId &&
mworker.worker.interf.locality.processId() != clusterControllerProcessId)) {
TraceEvent("BetterMasterExists", id)
.detail("OldMasterFit", oldMasterFit)
.detail("NewMasterFit", newMasterFit)
.detail("OldIsCC", dbi.master.locality.processId() == clusterControllerProcessId)
.detail("NewIsCC", mworker.worker.interf.locality.processId() == clusterControllerProcessId);
return true;
}
std::set<Optional<Key>> primaryDC;
std::set<Optional<Key>> remoteDC;
RegionInfo region;
RegionInfo remoteRegion;
if (db.config.regions.size()) {
primaryDC.insert(clusterControllerDcId);
for (auto& r : db.config.regions) {
if (r.dcId != clusterControllerDcId.get()) {
ASSERT(remoteDC.empty());
remoteDC.insert(r.dcId);
remoteRegion = r;
} else {
ASSERT(region.dcId == StringRef());
region = r;
}
}
}
// Check tLog fitness
updateIdUsed(tlogs, old_id_used);
RoleFitness oldTLogFit(tlogs, ProcessClass::TLog, old_id_used);
auto newTLogs = getWorkersForTlogs(db.config,
db.config.tLogReplicationFactor,
db.config.getDesiredLogs(),
db.config.tLogPolicy,
id_used,
true,
primaryDC);
RoleFitness newTLogFit(newTLogs, ProcessClass::TLog, id_used);
bool oldSatelliteFallback = false;
if (region.satelliteTLogPolicyFallback.isValid()) {
for (auto& logSet : dbi.logSystemConfig.tLogs) {
if (region.satelliteTLogPolicy.isValid() && logSet.isLocal && logSet.locality == tagLocalitySatellite) {
oldSatelliteFallback = logSet.tLogPolicy->info() != region.satelliteTLogPolicy->info();
ASSERT(!oldSatelliteFallback ||
(region.satelliteTLogPolicyFallback.isValid() &&
logSet.tLogPolicy->info() == region.satelliteTLogPolicyFallback->info()));
break;
}
}
}
updateIdUsed(satellite_tlogs, old_id_used);
RoleFitness oldSatelliteTLogFit(satellite_tlogs, ProcessClass::TLog, old_id_used);
bool newSatelliteFallback = false;
auto newSatelliteTLogs = satellite_tlogs;
RoleFitness newSatelliteTLogFit = oldSatelliteTLogFit;
if (region.satelliteTLogReplicationFactor > 0 && db.config.usableRegions > 1) {
newSatelliteTLogs =
getWorkersForSatelliteLogs(db.config, region, remoteRegion, id_used, newSatelliteFallback, true);
newSatelliteTLogFit = RoleFitness(newSatelliteTLogs, ProcessClass::TLog, id_used);
}
std::map<Optional<Key>, int32_t> satellite_priority;
for (auto& r : region.satellites) {
satellite_priority[r.dcId] = r.priority;
}
int32_t oldSatelliteRegionFit = std::numeric_limits<int32_t>::max();
for (auto& it : satellite_tlogs) {
if (satellite_priority.count(it.interf.locality.dcId())) {
oldSatelliteRegionFit = std::min(oldSatelliteRegionFit, satellite_priority[it.interf.locality.dcId()]);
} else {
oldSatelliteRegionFit = -1;
}
}
int32_t newSatelliteRegionFit = std::numeric_limits<int32_t>::max();
for (auto& it : newSatelliteTLogs) {
if (satellite_priority.count(it.interf.locality.dcId())) {
newSatelliteRegionFit = std::min(newSatelliteRegionFit, satellite_priority[it.interf.locality.dcId()]);
} else {
newSatelliteRegionFit = -1;
}
}
if (oldSatelliteFallback && !newSatelliteFallback) {
TraceEvent("BetterMasterExists", id)
.detail("OldSatelliteFallback", oldSatelliteFallback)
.detail("NewSatelliteFallback", newSatelliteFallback);
return true;
}
if (!oldSatelliteFallback && newSatelliteFallback) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("OldSatelliteFallback", oldSatelliteFallback)
.detail("NewSatelliteFallback", newSatelliteFallback);
return false;
}
if (oldSatelliteRegionFit < newSatelliteRegionFit) {
TraceEvent("BetterMasterExists", id)
.detail("OldSatelliteRegionFit", oldSatelliteRegionFit)
.detail("NewSatelliteRegionFit", newSatelliteRegionFit);
return true;
}
if (oldSatelliteRegionFit > newSatelliteRegionFit) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("OldSatelliteRegionFit", oldSatelliteRegionFit)
.detail("NewSatelliteRegionFit", newSatelliteRegionFit);
return false;
}
updateIdUsed(remote_tlogs, old_id_used);
RoleFitness oldRemoteTLogFit(remote_tlogs, ProcessClass::TLog, old_id_used);
std::vector<UID> exclusionWorkerIds;
auto fn = [](const WorkerDetails& in) { return in.interf.id(); };
std::transform(newTLogs.begin(), newTLogs.end(), std::back_inserter(exclusionWorkerIds), fn);
std::transform(newSatelliteTLogs.begin(), newSatelliteTLogs.end(), std::back_inserter(exclusionWorkerIds), fn);
RoleFitness newRemoteTLogFit = oldRemoteTLogFit;
if (db.config.usableRegions > 1 && (dbi.recoveryState == RecoveryState::ALL_LOGS_RECRUITED ||
dbi.recoveryState == RecoveryState::FULLY_RECOVERED)) {
newRemoteTLogFit = RoleFitness(getWorkersForTlogs(db.config,
db.config.getRemoteTLogReplicationFactor(),
db.config.getDesiredRemoteLogs(),
db.config.getRemoteTLogPolicy(),
id_used,
true,
remoteDC,
exclusionWorkerIds),
ProcessClass::TLog,
id_used);
}
int oldRouterCount =
oldTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1, oldTLogFit.count));
int newRouterCount =
newTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1, newTLogFit.count));
updateIdUsed(log_routers, old_id_used);
RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter, old_id_used);
RoleFitness newLogRoutersFit = oldLogRoutersFit;
if (db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) {
newLogRoutersFit = RoleFitness(getWorkersForRoleInDatacenter(*remoteDC.begin(),
ProcessClass::LogRouter,
newRouterCount,
db.config,
id_used,
{},
Optional<WorkerFitnessInfo>(),
true),
ProcessClass::LogRouter,
id_used);
}
if (oldLogRoutersFit.count < oldRouterCount) {
oldLogRoutersFit.worstFit = ProcessClass::NeverAssign;
}
if (newLogRoutersFit.count < newRouterCount) {
newLogRoutersFit.worstFit = ProcessClass::NeverAssign;
}
// Check proxy/grvProxy/resolver fitness
updateIdUsed(commitProxyClasses, old_id_used);
updateIdUsed(grvProxyClasses, old_id_used);
updateIdUsed(resolverClasses, old_id_used);
RoleFitness oldCommitProxyFit(commitProxyClasses, ProcessClass::CommitProxy, old_id_used);
RoleFitness oldGrvProxyFit(grvProxyClasses, ProcessClass::GrvProxy, old_id_used);
RoleFitness oldResolverFit(resolverClasses, ProcessClass::Resolver, old_id_used);
std::map<Optional<Standalone<StringRef>>, int> preferredSharing;
auto first_commit_proxy = getWorkerForRoleInDatacenter(clusterControllerDcId,
ProcessClass::CommitProxy,
ProcessClass::ExcludeFit,
db.config,
id_used,
preferredSharing,
true);
preferredSharing[first_commit_proxy.worker.interf.locality.processId()] = 0;
auto first_grv_proxy = getWorkerForRoleInDatacenter(clusterControllerDcId,
ProcessClass::GrvProxy,
ProcessClass::ExcludeFit,
db.config,
id_used,
preferredSharing,
true);
preferredSharing[first_grv_proxy.worker.interf.locality.processId()] = 1;
auto first_resolver = getWorkerForRoleInDatacenter(clusterControllerDcId,
ProcessClass::Resolver,
ProcessClass::ExcludeFit,
db.config,
id_used,
preferredSharing,
true);
preferredSharing[first_resolver.worker.interf.locality.processId()] = 2;
auto maxUsed = std::max({ first_commit_proxy.used, first_grv_proxy.used, first_resolver.used });
first_commit_proxy.used = maxUsed;
first_grv_proxy.used = maxUsed;
first_resolver.used = maxUsed;
auto commit_proxies = getWorkersForRoleInDatacenter(clusterControllerDcId,
ProcessClass::CommitProxy,
db.config.getDesiredCommitProxies(),
db.config,
id_used,
preferredSharing,
first_commit_proxy,
true);
auto grv_proxies = getWorkersForRoleInDatacenter(clusterControllerDcId,
ProcessClass::GrvProxy,
db.config.getDesiredGrvProxies(),
db.config,
id_used,
preferredSharing,
first_grv_proxy,
true);
auto resolvers = getWorkersForRoleInDatacenter(clusterControllerDcId,
ProcessClass::Resolver,
db.config.getDesiredResolvers(),
db.config,
id_used,
preferredSharing,
first_resolver,
true);
RoleFitness newCommitProxyFit(commit_proxies, ProcessClass::CommitProxy, id_used);
RoleFitness newGrvProxyFit(grv_proxies, ProcessClass::GrvProxy, id_used);
RoleFitness newResolverFit(resolvers, ProcessClass::Resolver, id_used);
// Check backup worker fitness
updateIdUsed(backup_workers, old_id_used);
RoleFitness oldBackupWorkersFit(backup_workers, ProcessClass::Backup, old_id_used);
const int nBackup = backup_addresses.size();
RoleFitness newBackupWorkersFit(getWorkersForRoleInDatacenter(clusterControllerDcId,
ProcessClass::Backup,
nBackup,
db.config,
id_used,
{},
Optional<WorkerFitnessInfo>(),
true),
ProcessClass::Backup,
id_used);
auto oldFit = std::make_tuple(oldTLogFit,
oldSatelliteTLogFit,
oldCommitProxyFit,
oldGrvProxyFit,
oldResolverFit,
oldBackupWorkersFit,
oldRemoteTLogFit,
oldLogRoutersFit);
auto newFit = std::make_tuple(newTLogFit,
newSatelliteTLogFit,
newCommitProxyFit,
newGrvProxyFit,
newResolverFit,
newBackupWorkersFit,
newRemoteTLogFit,
newLogRoutersFit);
if (oldFit > newFit) {
TraceEvent("BetterMasterExists", id)
.detail("OldMasterFit", oldMasterFit)
.detail("NewMasterFit", newMasterFit)
.detail("OldTLogFit", oldTLogFit.toString())
.detail("NewTLogFit", newTLogFit.toString())
.detail("OldSatelliteFit", oldSatelliteTLogFit.toString())
.detail("NewSatelliteFit", newSatelliteTLogFit.toString())
.detail("OldCommitProxyFit", oldCommitProxyFit.toString())
.detail("NewCommitProxyFit", newCommitProxyFit.toString())
.detail("OldGrvProxyFit", oldGrvProxyFit.toString())
.detail("NewGrvProxyFit", newGrvProxyFit.toString())
.detail("OldResolverFit", oldResolverFit.toString())
.detail("NewResolverFit", newResolverFit.toString())
.detail("OldBackupWorkerFit", oldBackupWorkersFit.toString())
.detail("NewBackupWorkerFit", newBackupWorkersFit.toString())
.detail("OldRemoteFit", oldRemoteTLogFit.toString())
.detail("NewRemoteFit", newRemoteTLogFit.toString())
.detail("OldRouterFit", oldLogRoutersFit.toString())
.detail("NewRouterFit", newLogRoutersFit.toString())
.detail("OldSatelliteFallback", oldSatelliteFallback)
.detail("NewSatelliteFallback", newSatelliteFallback);
return true;
}
if (oldFit < newFit) {
TraceEvent("NewRecruitmentIsWorse", id)
.detail("OldMasterFit", oldMasterFit)
.detail("NewMasterFit", newMasterFit)
.detail("OldTLogFit", oldTLogFit.toString())
.detail("NewTLogFit", newTLogFit.toString())
.detail("OldSatelliteFit", oldSatelliteTLogFit.toString())
.detail("NewSatelliteFit", newSatelliteTLogFit.toString())
.detail("OldCommitProxyFit", oldCommitProxyFit.toString())
.detail("NewCommitProxyFit", newCommitProxyFit.toString())
.detail("OldGrvProxyFit", oldGrvProxyFit.toString())
.detail("NewGrvProxyFit", newGrvProxyFit.toString())
.detail("OldResolverFit", oldResolverFit.toString())
.detail("NewResolverFit", newResolverFit.toString())
.detail("OldBackupWorkerFit", oldBackupWorkersFit.toString())
.detail("NewBackupWorkerFit", newBackupWorkersFit.toString())
.detail("OldRemoteFit", oldRemoteTLogFit.toString())
.detail("NewRemoteFit", newRemoteTLogFit.toString())
.detail("OldRouterFit", oldLogRoutersFit.toString())
.detail("NewRouterFit", newLogRoutersFit.toString())
.detail("OldSatelliteFallback", oldSatelliteFallback)
.detail("NewSatelliteFallback", newSatelliteFallback);
}
return false;
}