in brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java [344:541]
protected boolean balanceItemsOnColdNode(NodeType questionedNode, double questionedNodeTotalWorkrate, boolean gonnaGrow) {
// Abort if the node has pending adjustments.
Map<ItemType, Double> items = getDataProvider().getItemWorkrates(questionedNode);
if (items == null) {
if (LOG.isDebugEnabled()) {
LOG.debug( MessageFormat.format(
"policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
"(workrate {0,number,#.##}); workrate breakdown unavailable (probably reverting)",
questionedNodeTotalWorkrate) );
}
return false;
}
for (ItemType item : items.keySet()) {
if (!model.isItemMoveable(item)) {
if (LOG.isDebugEnabled()) {
LOG.debug( MessageFormat.format(
"policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
"(workrate {0,number,#.##}); at least one item ("+item+") is in flux",
questionedNodeTotalWorkrate) );
}
return false;
}
}
double originalQuestionedNodeTotalWorkrate = questionedNodeTotalWorkrate;
int numMigrations = 0;
Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>();
Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>();
int iters = 0;
Location questionedLocation = getDataProvider().getLocation(questionedNode);
double lowThreshold = model.getLowThreshold(questionedNode);
while (questionedNodeTotalWorkrate < lowThreshold) {
iters++;
if (LOG.isDebugEnabled()) {
LOG.debug( MessageFormat.format(
"policy "+getDataProvider().getName()+" considering balancing cold node "+questionedNode+" " +
"(workrate {0,number,#.##}); iteration "+iters, questionedNodeTotalWorkrate));
}
// move from cold node, to hottest
NodeType hotNode = helper.findHottestContainer(nodesChecked);
if (hotNode == null) {
if (LOG.isDebugEnabled()) {
LOG.debug( MessageFormat.format(
"policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
"(workrate {0,number,#.##}); no hottest node available", questionedNodeTotalWorkrate) );
}
break;
}
if (hotNode.equals(questionedNode)) {
if (LOG.isDebugEnabled()) {
LOG.debug( MessageFormat.format(
"policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
"(workrate {0,number,#.##}); it is also the hottest modfiable node", questionedNodeTotalWorkrate) );
}
break;
}
double hotNodeWorkrate = getDataProvider().getTotalWorkrate(hotNode);
double hotNodeLowThreshold = model.getLowThreshold(hotNode);
double hotNodeHighThreshold = model.getHighThreshold(hotNode);
boolean emergencyLoadBalancing = false; //doesn't apply to cold
if (hotNodeWorkrate == -1 || hotNodeLowThreshold == -1 || hotNodeHighThreshold == -1) {
// hotNode presumably has been removed; TODO log
break;
}
if (hotNodeWorkrate <= hotNodeLowThreshold && !emergencyLoadBalancing) {
//don't balance if all nodes are too low
//for now, stop warning if it is a recurring theme!
// Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
// LOG.log(level, MessageFormat.format(
// "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
// "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low" +
// (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
// questionedNodeTotalWorkrate, hotNodeWorkrate) );
// loggedHottestTooLow = true;
break;
}
if (gonnaGrow && (hotNodeWorkrate <= hotNodeHighThreshold && !emergencyLoadBalancing)) {
//if we're growing the pool, refuse to balance unless the hot node is quite hot
//again, stop warning if it is a recurring theme!
// Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
// LOG.log(level, MessageFormat.format(
// "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
// "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low to accept while pool is growing"+
// (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
// questionedNodeTotalWorkrate, hotNodeWorkrate) );
// loggedHottestTooLow = true;
break;
}
String questionedNodeName = getDataProvider().getName(questionedNode);
String hotNodeName = getDataProvider().getName(hotNode);
if (LOG.isDebugEnabled()) {
LOG.debug( MessageFormat.format(
"policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
"("+questionedNode+", workrate {0,number,#.##}), " +
"considering source "+hotNodeName+" ("+hotNode+", workrate {1,number,#.##})",
questionedNodeTotalWorkrate, hotNodeWorkrate) );
}
double idealSizeToMove = (hotNodeWorkrate - questionedNodeTotalWorkrate) / 2;
//if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount
double targetNodeHighThreshold = model.getHighThreshold(questionedNode);
if (idealSizeToMove + questionedNodeTotalWorkrate > targetNodeHighThreshold)
idealSizeToMove = targetNodeHighThreshold - questionedNodeTotalWorkrate;
double maxSizeToMoveIdeally = Math.min(
hotNodeWorkrate/2,
//allow to swap order, but not very much (0.9 was allowed when balancing high)
(hotNodeWorkrate - questionedNodeTotalWorkrate)*0.6);
double maxSizeToMoveIfNoSmallButLarger = questionedNodeTotalWorkrate*3/4;
Map<ItemType, Double> hotNodeItems = getDataProvider().getItemWorkrates(hotNode);
if (hotNodeItems == null) {
if (LOG.isDebugEnabled())
LOG.debug(MessageFormat.format(
"policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
"("+questionedNode+", workrate {0,number,#.##}), " +
"excluding hot node "+hotNodeName+" because its item report unavailable",
questionedNodeTotalWorkrate));
nodesChecked.add(hotNode);
continue;
}
ItemType itemToMove = findBestItemToMove(hotNodeItems, idealSizeToMove, maxSizeToMoveIdeally,
maxSizeToMoveIfNoSmallButLarger, itemsMoved, questionedLocation);
if (itemToMove == null) {
if (LOG.isDebugEnabled())
LOG.debug(MessageFormat.format(
"policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
"("+questionedNode+", workrate {0,number,#.##}), " +
"excluding hot node "+hotNodeName+" because it has no appilcable items " +
"(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " +
"moving from hot node "+hotNodeName+" ("+hotNode+", workrate {3,number,#.##}); available items: {4}",
questionedNodeTotalWorkrate, idealSizeToMove, maxSizeToMoveIdeally, hotNodeWorkrate, hotNodeItems) );
nodesChecked.add(hotNode);
continue;
}
itemsMoved.add(itemToMove);
double segmentRate = hotNodeItems.get(itemToMove);
// if (LOG.isLoggable(Level.FINE))
// LOG.fine( MessageFormat.format(
// "policy "+getDataProvider().getAbbr()+" balancing cold node "+questionedNodeName+" " +
// "(workrate {0,number,#.##}, too low), transitioning " + itemToMove +
// " from "+hotNodeName+" (workrate {1,number,#.##}, now -= {2,number,#.##})",
// questionedNodeTotalWorkrate, hotNodeWorkrate, segmentRate) );
questionedNodeTotalWorkrate += segmentRate;
hotNodeWorkrate -= segmentRate;
moveItem(itemToMove, hotNode, questionedNode);
if (++numMigrations >= getMaxMigrationsPerBalancingNode()) {
break;
}
}
if (LOG.isDebugEnabled()) {
if (iters == 0) {
if (LOG.isTraceEnabled())
LOG.trace( MessageFormat.format(
"policy "+getDataProvider().getName()+" balancing if cold finished at node "+questionedNode+"; " +
"workrate {0,number,#.##} not cold",
originalQuestionedNodeTotalWorkrate) );
}
else if (itemsMoved.isEmpty()) {
if (LOG.isTraceEnabled())
LOG.trace( MessageFormat.format(
"policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+" " +
"(workrate {0,number,#.##}); no way to improve it",
originalQuestionedNodeTotalWorkrate) );
} else {
LOG.debug( MessageFormat.format(
"policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+"; " +
"workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " +
"by moving in {3}",
originalQuestionedNodeTotalWorkrate,
questionedNodeTotalWorkrate,
getDataProvider().getTotalWorkrate(questionedNode),
itemsMoved) );
}
}
return !itemsMoved.isEmpty();
}