in src/prod/src/Reliability/LoadBalancing/PlacementCreator.cpp [465:922]
void PlacementCreator::CreatePartitionEntries()
{
size_t allReplicaCount = 0;
size_t replicaIndex = 0;
for (auto it = partitionClosure_->Partitions.begin(); it != partitionClosure_->Partitions.end(); ++it)
{
FailoverUnit const& failoverUnit = *(*it);
int replicaDiff = 0;
if (failoverUnit.ActualReplicaDifference > 0)
{
replicaDiff = min(failoverUnit.ActualReplicaDifference, static_cast<int>(plb_.upNodeCount_));
}
allReplicaCount += failoverUnit.FuDescription.Replicas.size() + replicaDiff;
}
allReplicas_.reserve(allReplicaCount);
standByReplicas_.reserve(allReplicaCount);
bool isCurrentActionConstraintCheck = partitionClosure_->IsForConstraintCheck();
bool allowConstraintCheckFixesDuringAppUpgrade =
isCurrentActionConstraintCheck
&& PLBConfig::GetConfig().AllowConstraintCheckFixesDuringApplicationUpgrade;
for (auto it = partitionClosure_->Partitions.begin(); it != partitionClosure_->Partitions.end(); ++it)
{
FailoverUnit const& failoverUnit = *(*it);
FailoverUnitDescription const& fuDesc = failoverUnit.FuDescription;
Service const& service = serviceDomain_.GetService(fuDesc.ServiceId);
ServiceType const& serviceType = plb_.GetServiceType(service.ServiceDesc);
vector<PlacementReplica *> replicas;
replicas.reserve(fuDesc.Replicas.size());
size_t upReplicaCount = 0;
vector<PlacementReplica *> partitionStandBy;
vector<NodeEntry const*> standByLocations;
NodeEntry const* primarySwapOutLocation = nullptr;
NodeEntry const* primaryUpgradeLocation = nullptr;
vector<NodeEntry const*> secondaryUpgradeLocations;
PlacementReplica* primaryReplica = nullptr;
bool isPartitionInUpgrade = false;
bool isSwapPrimary = false;
// Children in affinity relationship cannot be throttled.
// Only stateful services can be throttled.
bool canBeThrottled = service.ServiceDesc.AffinitizedService == L"" && service.ServiceDesc.IsStateful;
for (ReplicaDescription const& replica : fuDesc.Replicas)
{
NodeMatchType nodeResult = NodeExistAndMatch(replica.NodeInstance);
if (nodeResult >= InstanceNotMatch)
{
uint64 nodeIndex = plb_.GetNodeIndex(replica.NodeId);
if (nodeIndex != UINT64_MAX)
{
bool isReplicaInUpgrade = false;
bool isReplicaToBeDropped = replica.IsToBeDropped;
if (replica.IsUp)
{
++upReplicaCount;
}
// The current replica node may already be de-activated, the nodeResult would be 1
if (replica.IsPrimaryToBeSwappedOut && replica.CurrentRole == ReplicaRole::Enum::Primary)
{
// Ignore the I flag if it is set on secondary
primarySwapOutLocation = &(balanceChecker_->Nodes[nodeIndex]);
isPartitionInUpgrade = true;
isReplicaInUpgrade = true;
isSwapPrimary = true;
if (isReplicaToBeDropped && failoverUnit.ActualReplicaDifference > 0 && fuDesc.Replicas.size() == 1)
{
// Special case for single replica to be upgraded; new replica should be placed in different UDs
isReplicaToBeDropped = false;
}
}
ReplicaRole::Enum currentRole = replica.CurrentRole;
if (replica.IsPrimaryToBePlaced && !replica.ShouldDisappear)
{
// Ignore the J flag if it is set on MoveInProgress or ToBeDropped replica
primaryUpgradeLocation = &(balanceChecker_->Nodes[nodeIndex]);
isPartitionInUpgrade = true;
isReplicaInUpgrade = true;
}
else if (replica.IsReplicaToBePlaced)
{
secondaryUpgradeLocations.push_back(&(balanceChecker_->Nodes[nodeIndex]));
isPartitionInUpgrade = true;
isReplicaInUpgrade = true;
}
if (currentRole == ReplicaRole::Dropped)
{
continue;
}
else if (currentRole == ReplicaRole::StandBy)
{
standByLocations.push_back(&(balanceChecker_->Nodes[nodeIndex]));
standByReplicas_.push_back(make_unique<PlacementReplica>(
replicaIndex++,
currentRole,
false,
false,
true,
&(balanceChecker_->Nodes[nodeIndex]),
replica.IsMoveInProgress,
isReplicaToBeDropped,
isReplicaInUpgrade,
false,
canBeThrottled));
partitionStandBy.push_back(standByReplicas_.back().get());
}
else
{
// Allow constraint check fixes for replicas on a Restart/RemoveData/RemoveNode nodes
// while safety checks are in progress.
bool isMovableDuringDeactivation = isCurrentActionConstraintCheck && nodeResult == IsRestartingOrRemoving;
bool isInTransition =
replica.IsInTransition ||
// instance is not match or node is paused or node is deactivated
nodeResult != Match && !isMovableDuringDeactivation ||
serviceType.ServiceTypeDesc.IsInBlockList(replica.NodeId);
// If partition is in upgrade, all replicas are not movable
// if PLB config prevent constraint check during application upgrade.
// If constraint check is allowed during application upgrade
// all replicas are movable except IsPrimaryToBeSwappedOut replica (marked with I) or
// primary replica together with IsPrimaryToBePlaced replica (marked with J).
bool isMovable = currentRole != ReplicaRole::None && !isInTransition && !fuDesc.IsInQuorumLost;
// Allow constraint check fixes for all other replicas except replicas that are currently in app upgrade
bool movableDueToAppUpgrade =
!fuDesc.IsInUpgrade ||
allowConstraintCheckFixesDuringAppUpgrade && !isReplicaInUpgrade;
isMovable = isMovable && movableDueToAppUpgrade;
// Don't drop extra replicas only from paused node.
bool isDroppable = nodeResult != IsPaused;
// Check if singleton replica is movable during upgrade
bool isSingletonReplicaMovableDuringUpgrade = service.ServiceDesc.TargetReplicaSetSize == 1 ?
PlacementReplica::CheckIfSingletonReplicaIsMovableDuringUpgrade(replica) :
false;
allReplicas_.push_back(make_unique<PlacementReplica>(
replicaIndex++,
currentRole,
isMovable,
isDroppable,
isInTransition,
&(balanceChecker_->Nodes[nodeIndex]),
replica.IsMoveInProgress,
isReplicaToBeDropped,
isReplicaInUpgrade,
isSingletonReplicaMovableDuringUpgrade,
canBeThrottled));
if (currentRole == ReplicaRole::Primary)
{
primaryReplica = allReplicas_.back().get();
}
replicas.push_back(allReplicas_.back().get());
}
}
}
}
if (primaryReplica != nullptr && primaryUpgradeLocation != nullptr)
{
primaryReplica->IsMovable = false;
}
// get service entry
auto itService = serviceEntryDict_.find(fuDesc.ServiceId);
if (itService == serviceEntryDict_.end())
{
TESTASSERT_IF(itService == serviceEntryDict_.end(), "No such service entry {0} for a partition entry {1}",
fuDesc.ServiceName,
fuDesc.FUId);
serviceDomain_.plb_.Trace.PlacementCreatorServiceNotFound(fuDesc.FUId, fuDesc.ServiceName);
// Ignore this partition while searching.
continue;
}
ServiceEntry* serviceEntry = itService->second;
if (serviceEntry->AutoSwitchToQuorumBasedLogic ||
plb_.Settings.QuorumBasedReplicaDistributionPerFaultDomains ||
plb_.Settings.QuorumBasedReplicaDistributionPerUpgradeDomains)
{
quorumBasedPartitionsCount_++;
}
// Is partition in singleton replica upgrade
bool isSingletonReplicaUpgrade = isPartitionInUpgrade &&
serviceEntry->TargetReplicaSetSize == 1 &&
failoverUnit.ActualReplicaDifference == 1 &&
plb_.isSingletonReplicaMoveAllowedDuringUpgradeEntry_.GetValue();
bool isSingletonReplicaInUpgradeTransition = serviceEntry->TargetReplicaSetSize == 1 && replicas.size() > 1;
PartitionEntry::SingletonReplicaUpgradeOptimizationType upgradeOptimization = PartitionEntry::SingletonReplicaUpgradeOptimizationType::None;
// Set appropriate singleton upgrade optimization
if ((isSingletonReplicaUpgrade ||
isSingletonReplicaInUpgradeTransition) &&
balanceChecker_->Settings.CheckAffinityForUpgradePlacement &&
((serviceEntry->DependedService != nullptr && // child: has parent service with TRC=1
serviceEntry->DependedService->TargetReplicaSetSize == 1) ||
serviceEntry->DependentServices.size() > 0)) // parent: has child services
{
// If it is initial upgrade FT update, set the appropriate optimization
if (isSingletonReplicaUpgrade)
{
upgradeOptimization = PartitionEntry::SingletonReplicaUpgradeOptimizationType::CheckAffinityDuringUpgrade;
}
serviceEntry->HasAffinityAssociatedSingletonReplicaInUpgrade = true;
}
else if ((isSingletonReplicaUpgrade ||
isSingletonReplicaInUpgradeTransition) &&
balanceChecker_->Settings.RelaxScaleoutConstraintDuringUpgrade &&
serviceEntry->Application != nullptr &&
serviceEntry->Application->ScaleoutCount == 1 &&
serviceEntry->Application->IsInSingletonReplicaUpgrade(plb_, replicas))
{
// If it is initial upgrade FT update, set the appropriate optimization
if (isSingletonReplicaUpgrade)
{
upgradeOptimization = PartitionEntry::SingletonReplicaUpgradeOptimizationType::RelaxScaleoutDuringUpgrade;
}
ApplicationEntry* application = const_cast<ApplicationEntry*>(serviceEntry->Application);
application->HasPartitionsInSingletonReplicaUpgrade = true;
}
int replicaDiff = failoverUnit.ActualReplicaDifference;
bool isScaledToEveryNode = false;
int minInstanceCount = 0;
if (service.ServiceDesc.IsAutoScalingDefined
&& service.ServiceDesc.AutoScalingPolicy.IsPartitionScaled()
&& service.ServiceDesc.AutoScalingPolicy.Mechanism->Kind == ScalingMechanismKind::PartitionInstanceCount)
{
InstanceCountScalingMechanismSPtr mechanism = static_pointer_cast<InstanceCountScalingMechanism>(service.ServiceDesc.AutoScalingPolicy.Mechanism);
if (mechanism->MaximumInstanceCount == -1)
{
isScaledToEveryNode = true;
}
minInstanceCount = mechanism->MinimumInstanceCount;
}
if (INT_MAX == replicaDiff || isScaledToEveryNode && replicaDiff == 0)
{
int numReplicasOnBlocklistedNodes = 0;
NodeSet nodes(static_cast<int>(plb_.nodes_.size()));
for (auto dIt = balanceChecker_->DeactivatedNodes.begin(); dIt != balanceChecker_->DeactivatedNodes.end(); ++dIt)
{
nodes.Add(*dIt);
}
for (auto downIt = balanceChecker_->DownNodes.begin(); downIt != balanceChecker_->DownNodes.end(); ++downIt)
{
nodes.Add(*downIt);
}
// replicaDiff should be recalculated for FTs with target -1, and for FT's that have maxInstanceCount -1 and target is above nonBlocklisted node count
// If target is below nonBlocklisted node count, replicas on blocklisted nodes should be moved in Constraint check phase.
if (INT_MAX == replicaDiff || failoverUnit.FuDescription.TargetReplicaSetSize > static_cast<int>(plb_.nodes_.size()) - nodes.GetTotalOneCount())
{
serviceEntry->BlockList.ForEach([&](size_t nodeIndex) -> void
{
nodes.Add(static_cast<int>(nodeIndex));
});
for (auto rIt = replicas.begin(); rIt != replicas.end(); ++rIt)
{
if (serviceEntry->IsNodeInBlockList((*rIt)->Node))
{
numReplicasOnBlocklistedNodes++;
}
nodes.Add((*rIt)->Node);
}
replicaDiff = static_cast<int>(plb_.nodes_.size()) - nodes.GetTotalOneCount();
// Do not place more replicas if we are already over scaleout count
if (serviceEntry->Application != nullptr
&& serviceEntry->Application->ScaleoutCount > 0
&& serviceEntry->Application->ScaleoutCount <= replicas.size())
{
replicaDiff = 0;
}
if (replicaDiff < 0)
{
TESTASSERT_IF(replicaDiff < 0, "The calculated replicaDiff {0} shouldn't be less than 0", replicaDiff);
serviceDomain_.plb_.Trace.PlacementReplicaDiffError(fuDesc.FUId, replicaDiff);
replicaDiff = 0;
}
if (0 == replicaDiff)
{
// Drop replicas that are on blocklisted nodes
replicaDiff = -numReplicasOnBlocklistedNodes;
if (isScaledToEveryNode)
{
replicaDiff = max(replicaDiff, minInstanceCount - (int)replicas.size());
}
}
}
}
replicaDiff = min(replicaDiff, static_cast<int>(plb_.upNodeCount_));
int extraReplicaCount = 0;
if (replicaDiff > 0)
{
if (service.ServiceDesc.IsStateful)
{
bool havingPrimary = any_of(failoverUnit.FuDescription.Replicas.begin(), failoverUnit.FuDescription.Replicas.end(), [](ReplicaDescription const& r)
{
if (r.CurrentRole == ReplicaRole::Primary && !r.IsToBeDropped)
{
return true;
}
else if (r.CurrentRole == ReplicaRole::Primary && r.IsToBeDropped && r.IsPrimaryToBeSwappedOut)
{
return true;
}
else
{
return false;
}
});
if (!havingPrimary)
{
// Primary build cannot be throttled if there are no replicas in the configuration.
bool canPrimaryBeThrottled = canBeThrottled &&
any_of(failoverUnit.FuDescription.Replicas.begin(),
failoverUnit.FuDescription.Replicas.end(),
[](ReplicaDescription const& r) {
if (r.CurrentRole == ReplicaRole::Primary || r.CurrentRole == ReplicaRole::Secondary)
{
return true;
}
return false;
});
allReplicas_.push_back(make_unique<PlacementReplica>(replicaIndex++, ReplicaRole::Primary, canPrimaryBeThrottled));
replicas.push_back(allReplicas_.back().get());
replicaDiff--;
}
}
while (replicaDiff > 0)
{
allReplicas_.push_back(make_unique<PlacementReplica>(replicaIndex++, ReplicaRole::Secondary, service.ServiceDesc.IsStateful));
replicas.push_back(allReplicas_.back().get());
replicaDiff--;
}
}
else
{
extraReplicaCount = -replicaDiff;
}
if (replicas.empty() && partitionStandBy.empty())
{
// don't include empty partitions
continue;
}
// set the affinity order by ServiceEntry
int order = 0;
ServiceEntry const* dependedServiceEntry = serviceEntry->DependedService;
while (dependedServiceEntry != nullptr)
{
// This is a child service, needs to be sorted after parent
order += 2;
// Set parent flag for upgrade as well, hence proper initialization is done,
// in scenarios when only child partition is in upgrade
if ((isSingletonReplicaUpgrade ||
isSingletonReplicaInUpgradeTransition) &&
balanceChecker_->Settings.CheckAffinityForUpgradePlacement)
{
const_cast<ServiceEntry*>(dependedServiceEntry)->HasAffinityAssociatedSingletonReplicaInUpgrade = true;
}
dependedServiceEntry = dependedServiceEntry->DependedService;
}
if (upReplicaCount >= failoverUnit.FuDescription.TargetReplicaSetSize)
{
// If up replica count is not at target, "prioritize" this replica.
// Final order number will be:
// 0 - parent not on target, 1 - parent on target, 2 - child not on target, 3 - child on target
// "Prioritization" is done in searcher, by placing lower order first.
// Optimization for later: do better ordering (i.e. parents with children on target to the back).
++order;
}
vector<int64> primaryEntry(failoverUnit.PrimaryEntries.begin(), failoverUnit.PrimaryEntries.end());
vector<int64> secondaryEntry(failoverUnit.SecondaryEntries.begin(), failoverUnit.SecondaryEntries.end());
if (isPartitionInUpgrade)
{
partitionsInUpgradeCount_++;
}
bool aggregatedIsPartitionInUpgrade = isPartitionInUpgrade || fuDesc.IsInUpgrade;
// If partition is in upgrade, all replicas are not movable.
// Except if current PLB phase is ConstraintCheck and fixes are allowed during application upgrade.
bool isPartitionMovable = throttledPartitions_.end() == throttledPartitions_.find(fuDesc.FUId);
if (!(aggregatedIsPartitionInUpgrade && allowConstraintCheckFixesDuringAppUpgrade))
{
isPartitionMovable = isPartitionMovable && !fuDesc.IsInTransition && !aggregatedIsPartitionInUpgrade;
}
partitionEntries_.push_back(PartitionEntry(
fuDesc.FUId,
fuDesc.Version,
fuDesc.IsInTransition,
isPartitionMovable,
serviceEntry,
move(replicas),
LoadEntry(move(primaryEntry)),
LoadEntry(move(secondaryEntry)),
failoverUnit.GetMoveCostValue(ReplicaRole::Primary, balanceChecker_->Settings),
failoverUnit.GetMoveCostValue(ReplicaRole::Secondary, balanceChecker_->Settings),
move(standByLocations),
primarySwapOutLocation,
primaryUpgradeLocation,
move(secondaryUpgradeLocations),
isPartitionInUpgrade,
fuDesc.IsInUpgrade,
isSingletonReplicaUpgrade,
upgradeOptimization,
order,
extraReplicaCount,
failoverUnit.SecondaryEntriesMap,
move(partitionStandBy),
balanceChecker_->Settings,
failoverUnit.TargetReplicaSetSize));
}
}