void PlacementCreator::CreatePartitionEntries()

in src/prod/src/Reliability/LoadBalancing/PlacementCreator.cpp [465:922]


void PlacementCreator::CreatePartitionEntries()
{
    size_t allReplicaCount = 0;
    size_t replicaIndex = 0;
    for (auto it = partitionClosure_->Partitions.begin(); it != partitionClosure_->Partitions.end(); ++it)
    {
        FailoverUnit const& failoverUnit = *(*it);
        int replicaDiff = 0;
        if (failoverUnit.ActualReplicaDifference > 0)
        {
            replicaDiff = min(failoverUnit.ActualReplicaDifference, static_cast<int>(plb_.upNodeCount_));
        }
        allReplicaCount += failoverUnit.FuDescription.Replicas.size() + replicaDiff;
    }

    allReplicas_.reserve(allReplicaCount);
    standByReplicas_.reserve(allReplicaCount);

    bool isCurrentActionConstraintCheck = partitionClosure_->IsForConstraintCheck();

    bool allowConstraintCheckFixesDuringAppUpgrade =
        isCurrentActionConstraintCheck
        && PLBConfig::GetConfig().AllowConstraintCheckFixesDuringApplicationUpgrade;

    for (auto it = partitionClosure_->Partitions.begin(); it != partitionClosure_->Partitions.end(); ++it)
    {
        FailoverUnit const& failoverUnit = *(*it);
        FailoverUnitDescription const& fuDesc = failoverUnit.FuDescription;

        Service const& service = serviceDomain_.GetService(fuDesc.ServiceId);

        ServiceType const& serviceType = plb_.GetServiceType(service.ServiceDesc);

        vector<PlacementReplica *> replicas;
        replicas.reserve(fuDesc.Replicas.size());
        size_t upReplicaCount = 0;
        vector<PlacementReplica *> partitionStandBy;
        vector<NodeEntry const*> standByLocations;
        NodeEntry const* primarySwapOutLocation = nullptr;
        NodeEntry const* primaryUpgradeLocation = nullptr;
        vector<NodeEntry const*> secondaryUpgradeLocations;
        PlacementReplica* primaryReplica = nullptr;

        bool isPartitionInUpgrade = false;
        bool isSwapPrimary = false;
        // Children in affinity relationship cannot be throttled.
        // Only stateful services can be throttled.
        bool canBeThrottled = service.ServiceDesc.AffinitizedService == L"" && service.ServiceDesc.IsStateful;

        for (ReplicaDescription const& replica : fuDesc.Replicas)
        {
            NodeMatchType nodeResult = NodeExistAndMatch(replica.NodeInstance);
            if (nodeResult >= InstanceNotMatch)
            {

                uint64 nodeIndex = plb_.GetNodeIndex(replica.NodeId);
                if (nodeIndex != UINT64_MAX)
                {
                    bool isReplicaInUpgrade = false;
                    bool isReplicaToBeDropped = replica.IsToBeDropped;

                    if (replica.IsUp)
                    {
                        ++upReplicaCount;
                    }

                    // The current replica node may already be de-activated, the nodeResult would be 1
                    if (replica.IsPrimaryToBeSwappedOut && replica.CurrentRole == ReplicaRole::Enum::Primary)
                    {
                        // Ignore the I flag if it is set on secondary
                        primarySwapOutLocation = &(balanceChecker_->Nodes[nodeIndex]);
                        isPartitionInUpgrade = true;
                        isReplicaInUpgrade = true;
                        isSwapPrimary = true;

                        if (isReplicaToBeDropped && failoverUnit.ActualReplicaDifference > 0 && fuDesc.Replicas.size() == 1)
                        {
                            // Special case for single replica to be upgraded; new replica should be placed in different UDs
                            isReplicaToBeDropped = false;
                        }
                    }

                    ReplicaRole::Enum currentRole = replica.CurrentRole;

                    if (replica.IsPrimaryToBePlaced && !replica.ShouldDisappear)
                    {
                        // Ignore the J flag if it is set on MoveInProgress or ToBeDropped replica
                        primaryUpgradeLocation = &(balanceChecker_->Nodes[nodeIndex]);
                        isPartitionInUpgrade = true;
                        isReplicaInUpgrade = true;
                    }
                    else if (replica.IsReplicaToBePlaced)
                    {
                        secondaryUpgradeLocations.push_back(&(balanceChecker_->Nodes[nodeIndex]));
                        isPartitionInUpgrade = true;
                        isReplicaInUpgrade = true;
                    }

                    if (currentRole == ReplicaRole::Dropped)
                    {
                        continue;
                    }
                    else if (currentRole == ReplicaRole::StandBy)
                    {
                        standByLocations.push_back(&(balanceChecker_->Nodes[nodeIndex]));
                        standByReplicas_.push_back(make_unique<PlacementReplica>(
                            replicaIndex++,
                            currentRole,
                            false,
                            false,
                            true,
                            &(balanceChecker_->Nodes[nodeIndex]),
                            replica.IsMoveInProgress,
                            isReplicaToBeDropped,
                            isReplicaInUpgrade,
                            false,
                            canBeThrottled));
                        partitionStandBy.push_back(standByReplicas_.back().get());
                    }
                    else
                    {
                        // Allow constraint check fixes for replicas on a Restart/RemoveData/RemoveNode nodes
                        // while safety checks are in progress.
                        bool isMovableDuringDeactivation = isCurrentActionConstraintCheck && nodeResult == IsRestartingOrRemoving;

                        bool isInTransition =
                            replica.IsInTransition ||
                            // instance is not match or node is paused or node is deactivated
                            nodeResult != Match && !isMovableDuringDeactivation ||
                            serviceType.ServiceTypeDesc.IsInBlockList(replica.NodeId);

                        // If partition is in upgrade, all replicas are not movable
                        // if PLB config prevent constraint check during application upgrade.
                        // If constraint check is allowed during application upgrade
                        // all replicas are movable except IsPrimaryToBeSwappedOut replica (marked with I) or
                        // primary replica together with IsPrimaryToBePlaced replica (marked with J).
                        bool isMovable = currentRole != ReplicaRole::None && !isInTransition && !fuDesc.IsInQuorumLost;

                        // Allow constraint check fixes for all other replicas except replicas that are currently in app upgrade
                        bool movableDueToAppUpgrade =
                            !fuDesc.IsInUpgrade ||
                            allowConstraintCheckFixesDuringAppUpgrade && !isReplicaInUpgrade;

                        isMovable = isMovable && movableDueToAppUpgrade;

                        // Don't drop extra replicas only from paused node.
                        bool isDroppable = nodeResult != IsPaused;

                        // Check if singleton replica is movable during upgrade
                        bool isSingletonReplicaMovableDuringUpgrade = service.ServiceDesc.TargetReplicaSetSize == 1 ?
                            PlacementReplica::CheckIfSingletonReplicaIsMovableDuringUpgrade(replica) :
                            false;

                        allReplicas_.push_back(make_unique<PlacementReplica>(
                            replicaIndex++,
                            currentRole,
                            isMovable,
                            isDroppable,
                            isInTransition,
                            &(balanceChecker_->Nodes[nodeIndex]),
                            replica.IsMoveInProgress,
                            isReplicaToBeDropped,
                            isReplicaInUpgrade,
                            isSingletonReplicaMovableDuringUpgrade,
                            canBeThrottled));

                        if (currentRole == ReplicaRole::Primary)
                        {
                            primaryReplica = allReplicas_.back().get();
                        }

                        replicas.push_back(allReplicas_.back().get());
                    }
                }
            }
        }

        if (primaryReplica != nullptr && primaryUpgradeLocation != nullptr)
        {
            primaryReplica->IsMovable = false;
        }

        // get service entry
        auto itService = serviceEntryDict_.find(fuDesc.ServiceId);
        if (itService == serviceEntryDict_.end())
        {
            TESTASSERT_IF(itService == serviceEntryDict_.end(), "No such service entry {0} for a partition entry {1}",
                fuDesc.ServiceName,
                fuDesc.FUId);
            serviceDomain_.plb_.Trace.PlacementCreatorServiceNotFound(fuDesc.FUId, fuDesc.ServiceName);
            // Ignore this partition while searching.
            continue;
        }
        ServiceEntry* serviceEntry = itService->second;

        if (serviceEntry->AutoSwitchToQuorumBasedLogic || 
            plb_.Settings.QuorumBasedReplicaDistributionPerFaultDomains ||
            plb_.Settings.QuorumBasedReplicaDistributionPerUpgradeDomains)
        {
            quorumBasedPartitionsCount_++;
        }

        // Is partition in singleton replica upgrade
        bool isSingletonReplicaUpgrade = isPartitionInUpgrade &&
            serviceEntry->TargetReplicaSetSize == 1 &&
            failoverUnit.ActualReplicaDifference == 1 &&
            plb_.isSingletonReplicaMoveAllowedDuringUpgradeEntry_.GetValue();
        bool isSingletonReplicaInUpgradeTransition = serviceEntry->TargetReplicaSetSize == 1 && replicas.size() > 1;


        PartitionEntry::SingletonReplicaUpgradeOptimizationType upgradeOptimization = PartitionEntry::SingletonReplicaUpgradeOptimizationType::None;

        // Set appropriate singleton upgrade optimization
        if ((isSingletonReplicaUpgrade ||
            isSingletonReplicaInUpgradeTransition) &&
            balanceChecker_->Settings.CheckAffinityForUpgradePlacement &&
            ((serviceEntry->DependedService != nullptr &&             // child: has parent service with TRC=1
                serviceEntry->DependedService->TargetReplicaSetSize == 1) ||
                serviceEntry->DependentServices.size() > 0))          // parent: has child services
        {
            // If it is initial upgrade FT update, set the appropriate optimization
            if (isSingletonReplicaUpgrade)
            {
                upgradeOptimization = PartitionEntry::SingletonReplicaUpgradeOptimizationType::CheckAffinityDuringUpgrade;
            }
            serviceEntry->HasAffinityAssociatedSingletonReplicaInUpgrade = true;
        }
        else if ((isSingletonReplicaUpgrade ||
            isSingletonReplicaInUpgradeTransition) &&
            balanceChecker_->Settings.RelaxScaleoutConstraintDuringUpgrade &&
            serviceEntry->Application != nullptr &&
            serviceEntry->Application->ScaleoutCount == 1 &&
            serviceEntry->Application->IsInSingletonReplicaUpgrade(plb_, replicas))
        {
                // If it is initial upgrade FT update, set the appropriate optimization
                if (isSingletonReplicaUpgrade)
                {
                    upgradeOptimization = PartitionEntry::SingletonReplicaUpgradeOptimizationType::RelaxScaleoutDuringUpgrade;
                }
                ApplicationEntry* application = const_cast<ApplicationEntry*>(serviceEntry->Application);
                application->HasPartitionsInSingletonReplicaUpgrade = true;
        }

        int replicaDiff = failoverUnit.ActualReplicaDifference;
        
        bool isScaledToEveryNode = false;
        int minInstanceCount = 0;

        if (service.ServiceDesc.IsAutoScalingDefined 
            && service.ServiceDesc.AutoScalingPolicy.IsPartitionScaled()
            && service.ServiceDesc.AutoScalingPolicy.Mechanism->Kind == ScalingMechanismKind::PartitionInstanceCount)
        {
            InstanceCountScalingMechanismSPtr mechanism = static_pointer_cast<InstanceCountScalingMechanism>(service.ServiceDesc.AutoScalingPolicy.Mechanism);
            if (mechanism->MaximumInstanceCount == -1)
            {
                isScaledToEveryNode = true;
            }
            minInstanceCount = mechanism->MinimumInstanceCount;
        }

        if (INT_MAX == replicaDiff || isScaledToEveryNode && replicaDiff == 0)
        {
            int numReplicasOnBlocklistedNodes = 0;
            NodeSet nodes(static_cast<int>(plb_.nodes_.size()));

            for (auto dIt = balanceChecker_->DeactivatedNodes.begin(); dIt != balanceChecker_->DeactivatedNodes.end(); ++dIt)
            {
                nodes.Add(*dIt);
            }

            for (auto downIt = balanceChecker_->DownNodes.begin(); downIt != balanceChecker_->DownNodes.end(); ++downIt)
            {
                nodes.Add(*downIt);
            }

            // replicaDiff should be recalculated for FTs with target -1, and for FT's that have maxInstanceCount -1 and target is above nonBlocklisted node count
            // If target is below nonBlocklisted node count, replicas on blocklisted nodes should be moved in Constraint check phase.
            if (INT_MAX == replicaDiff || failoverUnit.FuDescription.TargetReplicaSetSize > static_cast<int>(plb_.nodes_.size()) - nodes.GetTotalOneCount())
            {
                serviceEntry->BlockList.ForEach([&](size_t nodeIndex) -> void
                {
                    nodes.Add(static_cast<int>(nodeIndex));
                });

                for (auto rIt = replicas.begin(); rIt != replicas.end(); ++rIt)
                {
                    if (serviceEntry->IsNodeInBlockList((*rIt)->Node))
                    {
                        numReplicasOnBlocklistedNodes++;
                    }

                    nodes.Add((*rIt)->Node);
                }

                replicaDiff = static_cast<int>(plb_.nodes_.size()) - nodes.GetTotalOneCount();

                // Do not place more replicas if we are already over scaleout count
                if (serviceEntry->Application != nullptr
                    && serviceEntry->Application->ScaleoutCount > 0
                    && serviceEntry->Application->ScaleoutCount <= replicas.size())
                {
                    replicaDiff = 0;
                }

                if (replicaDiff < 0)
                {
                    TESTASSERT_IF(replicaDiff < 0, "The calculated replicaDiff {0} shouldn't be less than 0", replicaDiff);
                    serviceDomain_.plb_.Trace.PlacementReplicaDiffError(fuDesc.FUId, replicaDiff);
                    replicaDiff = 0;
                }

                if (0 == replicaDiff)
                {
                    // Drop replicas that are on blocklisted nodes
                    replicaDiff = -numReplicasOnBlocklistedNodes;

                    if (isScaledToEveryNode)
                    {
                        replicaDiff = max(replicaDiff, minInstanceCount - (int)replicas.size());
                    }
                }
            }
        }

        replicaDiff = min(replicaDiff, static_cast<int>(plb_.upNodeCount_));
        int extraReplicaCount = 0;

        if (replicaDiff > 0)
        {
            if (service.ServiceDesc.IsStateful)
            {
                bool havingPrimary = any_of(failoverUnit.FuDescription.Replicas.begin(), failoverUnit.FuDescription.Replicas.end(), [](ReplicaDescription const& r)
                {
                    if (r.CurrentRole == ReplicaRole::Primary && !r.IsToBeDropped)
                    {
                        return true;
                    }
                    else if (r.CurrentRole == ReplicaRole::Primary && r.IsToBeDropped && r.IsPrimaryToBeSwappedOut)
                    {
                        return true;
                    }
                    else
                    {
                        return false;
                    }
                });

                if (!havingPrimary)
                {
                    // Primary build cannot be throttled if there are no replicas in the configuration.
                    bool canPrimaryBeThrottled = canBeThrottled &&
                        any_of(failoverUnit.FuDescription.Replicas.begin(),
                            failoverUnit.FuDescription.Replicas.end(),
                            [](ReplicaDescription const& r) {
                                if (r.CurrentRole == ReplicaRole::Primary || r.CurrentRole == ReplicaRole::Secondary)
                                {
                                    return true;
                                }
                                return false;
                            });
                    allReplicas_.push_back(make_unique<PlacementReplica>(replicaIndex++, ReplicaRole::Primary, canPrimaryBeThrottled));
                    replicas.push_back(allReplicas_.back().get());
                    replicaDiff--;
                }
            }

            while (replicaDiff > 0)
            {
                allReplicas_.push_back(make_unique<PlacementReplica>(replicaIndex++, ReplicaRole::Secondary, service.ServiceDesc.IsStateful));
                replicas.push_back(allReplicas_.back().get());
                replicaDiff--;
            }
        }
        else
        {
            extraReplicaCount = -replicaDiff;
        }

        if (replicas.empty() && partitionStandBy.empty())
        {
            // don't include empty partitions
            continue;
        }

        // set the affinity order by ServiceEntry
        int order = 0;
        ServiceEntry const* dependedServiceEntry = serviceEntry->DependedService;
        while (dependedServiceEntry != nullptr)
        {
            // This is a child service, needs to be sorted after parent
            order += 2;
            // Set parent flag for upgrade as well, hence proper initialization is done,
            // in scenarios when only child partition is in upgrade
            if ((isSingletonReplicaUpgrade ||
                isSingletonReplicaInUpgradeTransition) &&
                balanceChecker_->Settings.CheckAffinityForUpgradePlacement)
            {
                const_cast<ServiceEntry*>(dependedServiceEntry)->HasAffinityAssociatedSingletonReplicaInUpgrade = true;
            }
            dependedServiceEntry = dependedServiceEntry->DependedService;
        }

        if (upReplicaCount >= failoverUnit.FuDescription.TargetReplicaSetSize)
        {
            // If up replica count is not at target, "prioritize" this replica.
            // Final order number will be:
            //  0 - parent not on target, 1 - parent on target, 2 - child not on target, 3 - child on target
            // "Prioritization" is done in searcher, by placing lower order first.
            // Optimization for later: do better ordering (i.e. parents with children on target to the back).
            ++order;
        }

        vector<int64> primaryEntry(failoverUnit.PrimaryEntries.begin(), failoverUnit.PrimaryEntries.end());
        vector<int64> secondaryEntry(failoverUnit.SecondaryEntries.begin(), failoverUnit.SecondaryEntries.end());

        if (isPartitionInUpgrade)
        {
            partitionsInUpgradeCount_++;
        }

        bool aggregatedIsPartitionInUpgrade = isPartitionInUpgrade || fuDesc.IsInUpgrade;

        // If partition is in upgrade, all replicas are not movable.
        // Except if current PLB phase is ConstraintCheck and fixes are allowed during application upgrade.
        bool isPartitionMovable = throttledPartitions_.end() == throttledPartitions_.find(fuDesc.FUId);

        if (!(aggregatedIsPartitionInUpgrade && allowConstraintCheckFixesDuringAppUpgrade))
        {
            isPartitionMovable = isPartitionMovable && !fuDesc.IsInTransition && !aggregatedIsPartitionInUpgrade;
        }

        partitionEntries_.push_back(PartitionEntry(
            fuDesc.FUId,
            fuDesc.Version,
            fuDesc.IsInTransition,
            isPartitionMovable,
            serviceEntry,
            move(replicas),
            LoadEntry(move(primaryEntry)),
            LoadEntry(move(secondaryEntry)),
            failoverUnit.GetMoveCostValue(ReplicaRole::Primary, balanceChecker_->Settings),
            failoverUnit.GetMoveCostValue(ReplicaRole::Secondary, balanceChecker_->Settings),
            move(standByLocations),
            primarySwapOutLocation,
            primaryUpgradeLocation,
            move(secondaryUpgradeLocations),
            isPartitionInUpgrade,
            fuDesc.IsInUpgrade,
            isSingletonReplicaUpgrade,
            upgradeOptimization,
            order,
            extraReplicaCount,
            failoverUnit.SecondaryEntriesMap,
            move(partitionStandBy),
            balanceChecker_->Settings,
            failoverUnit.TargetReplicaSetSize));
    }
}