in src/prod/src/Reliability/LoadBalancing/PlacementAndLoadBalancing.cpp [572:1006]
Common::ErrorCode PlacementAndLoadBalancing::UpdateApplication(ApplicationDescription && applicationDescription, bool forceUpdate)
{
if (IsDisposed())
{
return ErrorCodeValue::PLBNotReady;
}
wstring applicationName = applicationDescription.Name;
AcquireWriteLock grab(lock_);
if (applicationDescription.ApplicationId == 0)
{
auto appIter = applicationToIdMap_.find(applicationName);
if (appIter == applicationToIdMap_.end())
{
// This is new application, so we need to assign new ID to it.
appIter = applicationToIdMap_.insert(make_pair(applicationName, nextApplicationId_++)).first;
}
applicationDescription.ApplicationId = appIter->second;
}
uint64 applicationId = applicationDescription.ApplicationId;
bool changed = false;
bool appGroupsAdded = false;
bool appGroupsRemoved = false;
bool scaleoutChanged = false;
bool maxInstanceCapChanged = false;
vector<wstring> applicationMetrics;
vector<wstring> oldApplicationMetrics;
if (!forceUpdate && !CheckClusterResourceForReservation(applicationDescription))
{
return ErrorCodeValue::InsufficientClusterCapacity;
}
auto itApplication = applicationTable_.find(applicationDescription.ApplicationId);
if (itApplication == applicationTable_.end())
{
// This is a new application.
// Add to total reserved capacity
UpdateAppReservedCapacity(applicationDescription, true);
// Add to application table
itApplication = applicationTable_.insert(make_pair(applicationDescription.ApplicationId, Application(move(applicationDescription)))).first;
if (applicationTable_.size() == 1)
{
nextApplicationToBeTraced_ = applicationTable_.begin()->first;
for (auto it = serviceDomainTable_.begin(); it != serviceDomainTable_.end(); it++)
{
it->second.LastApplicationTraced = applicationTable_.begin()->first;
}
}
changed = true;
if (itApplication->second.ApplicationDesc.HasScaleoutOrCapacity())
{
appGroupsAdded = true;
map<wstring, ApplicationCapacitiesDescription> const& appCapacities = itApplication->second.ApplicationDesc.AppCapacities;
for (auto itMetric = appCapacities.begin(); itMetric != appCapacities.end(); ++itMetric)
{
applicationMetrics.push_back(itMetric->second.MetricName);
}
}
bool anyPackageChanged = false;
for (auto & spIt : itApplication->second.ApplicationDesc.ServicePackages)
{
bool packagedChanged = false;
InternalUpdateServicePackageCallerHoldsLock(ServicePackageDescription(spIt.second), packagedChanged);
anyPackageChanged = anyPackageChanged || packagedChanged;
}
//this is a new application but it might be already in upgrade so we need to check for resources in that case
if (itApplication->second.ApplicationDesc.UpgradeInProgess && anyPackageChanged && itApplication->second.ApplicationDesc.IsPLBSafetyCheckRequiredForUpgrade)
{
Trace.ApplicationInRGChange(applicationDescription);
applicationsInUpgradeCheckRg_.insert(applicationDescription.ApplicationId);
}
}
else
{
// Check which SPs need to be changed.
set<ServiceModel::ServicePackageIdentifier> updatedSPs;
set<ServiceModel::ServicePackageIdentifier> deletedSPs;
bool anyPackageChanged = false;
itApplication->second.GetChangedServicePackages(applicationDescription, deletedSPs, updatedSPs);
for (auto deletedSPId : deletedSPs)
{
InternalDeleteServicePackageCallerHoldsLock(deletedSPId);
}
for (auto updatedSPId : updatedSPs)
{
auto updatedSpDesc = applicationDescription.ServicePackages.find(updatedSPId);
ASSERT_IF(updatedSpDesc == applicationDescription.ServicePackages.end(),
"Service Package {0} not found in applications SPs. Application description: {1}",
updatedSPId,
applicationDescription);
bool packageChanged = false;
InternalUpdateServicePackageCallerHoldsLock(ServicePackageDescription(updatedSpDesc->second), packageChanged);
anyPackageChanged = anyPackageChanged || packageChanged;
}
//if there is an ongoing upgrade and we have some packages that have changed we need to queue a safety check
//this will happen only the first time we receive an update for an application that is changing RG so we will not queue it more than once
//because we will save the new resource governance policy with the first update
if (applicationDescription.UpgradeInProgess && anyPackageChanged && applicationDescription.IsPLBSafetyCheckRequiredForUpgrade)
{
Trace.ApplicationInRGChange(applicationDescription);
applicationsInUpgradeCheckRg_.insert(applicationDescription.ApplicationId);
}
//this app is no longer in upgrade, so if it was canceled before we finished the safety check no need to keep checking for it
if (!applicationDescription.UpgradeInProgess)
{
applicationsInUpgradeCheckRg_.erase(applicationDescription.ApplicationId);
}
// Application already exists
// Delete the old app reserved load
UpdateAppReservedCapacity(itApplication->second.ApplicationDesc, false);
// Add reserved load in the new app description
UpdateAppReservedCapacity(applicationDescription, true);
std::map<wstring, uint64> applicationReservationMetrics;
std::map<wstring, uint64> oldApplicationReservationMetrics;
std::set<wstring> allMetricsWithReservation;
std::map<wstring, uint64> applicationMaxInstanceCapMetrics;
std::map<wstring, uint64> oldApplicationMaxInstanceCapMetrics;
std::set<wstring> allMetricsWithMaxInstanceCapMetrics;
//capacities were merely modified...
if (itApplication->second.ApplicationDesc.HasScaleoutOrCapacity())
{
if (!applicationDescription.HasScaleoutOrCapacity())
{
appGroupsRemoved = true;
}
map<wstring, ApplicationCapacitiesDescription> const& oldAppCapacities = itApplication->second.ApplicationDesc.AppCapacities;
for (auto itMetric = oldAppCapacities.begin(); itMetric != oldAppCapacities.end(); ++itMetric)
{
oldApplicationMetrics.push_back(itMetric->second.MetricName);
// add old reservation for this metric
if (itMetric->second.ReservationCapacity > 0)
{
oldApplicationReservationMetrics.insert(make_pair(itMetric->second.MetricName, itMetric->second.ReservationCapacity));
allMetricsWithReservation.insert(itMetric->second.MetricName);
}
// add old max instance capacity for this metric
if (itMetric->second.MaxInstanceCapacity > 0)
{
oldApplicationMaxInstanceCapMetrics.insert(make_pair(itMetric->second.MetricName, itMetric->second.MaxInstanceCapacity));
allMetricsWithMaxInstanceCapMetrics.insert(itMetric->second.MetricName);
}
}
}
if (applicationDescription.HasScaleoutOrCapacity())
{
// If the application description has changes of scaleout or capacity
// Placement creator can do the update
if (!itApplication->second.ApplicationDesc.HasScaleoutOrCapacity())
{
appGroupsAdded = true;
}
map<wstring, ApplicationCapacitiesDescription> const& appCapacities = applicationDescription.AppCapacities;
for (auto itMetric = appCapacities.begin(); itMetric != appCapacities.end(); ++itMetric)
{
applicationMetrics.push_back(itMetric->second.MetricName);
// add new reservation for this metric
if (itMetric->second.ReservationCapacity > 0)
{
applicationReservationMetrics.insert(make_pair(itMetric->second.MetricName, itMetric->second.ReservationCapacity));
allMetricsWithReservation.insert(itMetric->second.MetricName);
}
// add max instance capacity for this metric
if (itMetric->second.MaxInstanceCapacity > 0)
{
applicationMaxInstanceCapMetrics.insert(make_pair(itMetric->second.MetricName, itMetric->second.MaxInstanceCapacity));
allMetricsWithMaxInstanceCapMetrics.insert(itMetric->second.MetricName);
}
}
}
// for each metric check if reservation has been changed
// if reservation was changed remove FailoverUnits, change reservation and than add them back
bool changedReservationMetric = false;
for (auto itMetric = allMetricsWithReservation.begin(); itMetric != allMetricsWithReservation.end(); ++itMetric)
{
wstring currMetricName = *itMetric;
bool existsOldReservationMetric = false;
auto itOldReservationMetric = oldApplicationReservationMetrics.find(currMetricName);
if (itOldReservationMetric != oldApplicationReservationMetrics.end())
{
existsOldReservationMetric = true;
}
bool existsReservationMetric = false;
auto itReservationMetric = applicationReservationMetrics.find(currMetricName);
if (itReservationMetric != applicationReservationMetrics.end())
{
existsReservationMetric = true;
}
// this metric exists in both old and new application decriptions
// check if the reservation changed
if (existsOldReservationMetric && existsReservationMetric)
{
if (itReservationMetric->second != itOldReservationMetric->second)
{
changedReservationMetric = true;
break;
}
}
else if (existsOldReservationMetric)
{
// old application contains this metric, but new one doesn't
changedReservationMetric = true;
break;
}
else if (existsReservationMetric)
{
// new application contains this metric, but old one doesn't
changedReservationMetric = true;
break;
}
}
// For each metric check if capacity per node has been changed
for (auto itMetric = allMetricsWithMaxInstanceCapMetrics.begin(); itMetric != allMetricsWithMaxInstanceCapMetrics.end(); ++itMetric)
{
wstring currMetricName = *itMetric;
bool existsOldCapacityMetric = false;
auto itOldMaxInstanceCapMetric = oldApplicationMaxInstanceCapMetrics.find(currMetricName);
if (itOldMaxInstanceCapMetric != oldApplicationMaxInstanceCapMetrics.end())
{
existsOldCapacityMetric = true;
}
bool existsCapacityMetric = false;
auto itMaxInstanceCapMetric = applicationMaxInstanceCapMetrics.find(currMetricName);
if (itMaxInstanceCapMetric != applicationMaxInstanceCapMetrics.end())
{
existsCapacityMetric = true;
}
// this metric exists in both old and new application decriptions
// check if the max instance capacity changed
if (existsOldCapacityMetric && existsCapacityMetric)
{
if (itMaxInstanceCapMetric->second != itOldMaxInstanceCapMetric->second)
{
maxInstanceCapChanged = true;
break;
}
}
else if (existsOldCapacityMetric)
{
// old application contains this metric, but new one doesn't
maxInstanceCapChanged = true;
break;
}
else if (existsCapacityMetric)
{
// new application contains this metric, but old one doesn't
maxInstanceCapChanged = true;
break;
}
}
// check if number of minimum nodes has changed
bool isMinNodesChanged = itApplication->second.ApplicationDesc.MinimumNodes != applicationDescription.MinimumNodes;
scaleoutChanged = itApplication->second.ApplicationDesc.ScaleoutCount != applicationDescription.ScaleoutCount;
auto sdIterator = applicationToDomainTable_.find(applicationId);
if (sdIterator != applicationToDomainTable_.end())
{
// remove old reservation load when minNodes or application reservation is changed
if (changedReservationMetric)
{
// Delete all failoverUnits reservation for old application
// No need to update UpdateApplicationReservedLoad - already done that here
sdIterator->second->second.UpdateApplicationFailoverUnitsReservation(itApplication->second.ApplicationDesc, false);
}
else if (isMinNodesChanged)
{
// Remove the existing reservation
sdIterator->second->second.UpdateApplicationReservedLoad(itApplication->second.ApplicationDesc, false);
}
}
// change application description
ApplicationDescription newDescription = applicationDescription;
changed = itApplication->second.UpdateDescription(move(applicationDescription));
if (sdIterator != applicationToDomainTable_.end())
{
// add new reservation load when minNodes or application reservation is changed
if (changedReservationMetric)
{
// Add all failoverUnits reservation for new application
// No need to update UpdateApplicationReservedLoad - already done that here
sdIterator->second->second.UpdateApplicationFailoverUnitsReservation(newDescription, true);
}
else if (isMinNodesChanged)
{
// Add the new reservation
sdIterator->second->second.UpdateApplicationReservedLoad(newDescription, true);
}
}
}
if (changed)
{
Trace.UpdateApplication(itApplication->second.ApplicationDesc.Name, itApplication->second.ApplicationDesc);
if (appGroupsAdded || appGroupsRemoved || applicationMetrics != oldApplicationMetrics)
{
// Update the metric Connections
// Metric connections are as follows: each service is connected to the one alphabetically before it, and the first service is connected to the application's capacity metrics...
uint64 relatedServiceId(0);
bool toExecuteDomainChange = false;
if (applicationMetrics != oldApplicationMetrics)
{
toExecuteDomainChange |= metricConnections_.AddOrRemoveMetricConnection(applicationMetrics, true);
//general policy is all removes after all adds, but this is kept here, since it shouldn't split any domains that are instantly merged
toExecuteDomainChange |= metricConnections_.AddOrRemoveMetricConnection(oldApplicationMetrics, false);
}
auto const& services = itApplication->second.Services;
Uint64UnorderedMap<Service>::const_iterator itAdjacentService;
for (auto itServiceName = services.begin(); itServiceName != services.end(); itServiceName++)
{
auto const& serviceId = GetServiceId(*itServiceName);
if (serviceId != 0)
{
auto itServiceDomain = serviceToDomainTable_.find(serviceId);
if (itServiceDomain != serviceToDomainTable_.end())
{
auto itService = itServiceDomain->second->second.Services.find(serviceId);
if (itServiceName == services.begin())
{
auto const& serviceDesc = itService->second.ServiceDesc;
if (applicationMetrics != oldApplicationMetrics)
{
// The first service is connected to the application's capacity metrics...
toExecuteDomainChange |= metricConnections_.AddOrRemoveMetricAffinity(serviceDesc.Metrics, applicationMetrics, true);
toExecuteDomainChange |= metricConnections_.AddOrRemoveMetricAffinity(serviceDesc.Metrics, oldApplicationMetrics, false);
}
itAdjacentService = itService;
}
else
{
auto const& serviceDesc = itService->second.ServiceDesc;
auto const& adjacentServiceDesc = itAdjacentService->second.ServiceDesc;
// Each service is connected to the one alphabetically before it
if (appGroupsAdded)
{
toExecuteDomainChange |= metricConnections_.AddOrRemoveMetricAffinity(serviceDesc.Metrics, adjacentServiceDesc.Metrics, true);
}
else if (appGroupsRemoved)
{
toExecuteDomainChange |= metricConnections_.AddOrRemoveMetricAffinity(serviceDesc.Metrics, adjacentServiceDesc.Metrics, false);
}
else
{
break;
}
itAdjacentService = itService;
}
}
relatedServiceId = serviceId;
}
}
if (toExecuteDomainChange)
{
ExecuteDomainChange();
}
if (relatedServiceId != 0)
{
auto iter = serviceToDomainTable_.find(relatedServiceId);
if (iter != serviceToDomainTable_.end())
{
bool newDomain = AddApplicationToDomain(itApplication->second.ApplicationDesc, iter->second, oldApplicationMetrics, true);
if (newDomain)
{
// Handle the case when applicaiton is added to domain after its services
auto const& sdIterator = applicationToDomainTable_.find(applicationId);
if (sdIterator != applicationToDomainTable_.end() && itApplication->second.ApplicationDesc.AppCapacities.size() > 0)
{
sdIterator->second->second.UpdateReservationForNewApplication(itApplication->second.ApplicationDesc);
}
}
}
}
}
// Remove the application group partitions from the service domain table
if (appGroupsRemoved)
{
auto sdIterator = applicationToDomainTable_.find(applicationId);
if (sdIterator != applicationToDomainTable_.end())
{
sdIterator->second->second.RemoveApplicationFromApplicationPartitions(applicationId);
}
}
// Update constraint check closure, with partitions from the application group
if ((scaleoutChanged || maxInstanceCapChanged) && !appGroupsRemoved)
{
auto sdIterator = applicationToDomainTable_.find(applicationId);
if (sdIterator != applicationToDomainTable_.end())
{
sdIterator->second->second.AddAppPartitionsToPartialClosure(applicationId);
}
}
if (applicationDescription.HasScaleoutOrCapacity())
{
StopSearcher();
}
}
return ErrorCodeValue::Success;
}