protected boolean balanceItemsOnColdNode()

in policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java [344:541]


    protected boolean balanceItemsOnColdNode(NodeType questionedNode, double questionedNodeTotalWorkrate, boolean gonnaGrow) {
        // Abort if the node has pending adjustments.
        Map<ItemType, Double> items = getDataProvider().getItemWorkrates(questionedNode);
        if (items == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug( MessageFormat.format(
                        "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
                        "(workrate {0,number,#.##}); workrate breakdown unavailable (probably reverting)",
                        questionedNodeTotalWorkrate) );
            }
            return false;
        }
        for (ItemType item : items.keySet()) {
            if (!model.isItemMoveable(item)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug( MessageFormat.format(
                            "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
                            "(workrate {0,number,#.##}); at least one item ("+item+") is in flux",
                            questionedNodeTotalWorkrate) );
                }
                return false;
            }
        }
        
        double originalQuestionedNodeTotalWorkrate = questionedNodeTotalWorkrate;
        int numMigrations = 0;
        
        Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>();
        Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>();
        
        int iters = 0;
        Location questionedLocation = getDataProvider().getLocation(questionedNode);
        
        double lowThreshold = model.getLowThreshold(questionedNode);
        while (questionedNodeTotalWorkrate < lowThreshold) {
            iters++;
            
            if (LOG.isDebugEnabled()) {
                LOG.debug( MessageFormat.format(
                        "policy "+getDataProvider().getName()+" considering balancing cold node "+questionedNode+" " +
                        "(workrate {0,number,#.##}); iteration "+iters, questionedNodeTotalWorkrate));
            }
            
            // move from cold node, to hottest
            
            NodeType hotNode = helper.findHottestContainer(nodesChecked);
            
            if (hotNode == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug( MessageFormat.format(
                            "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
                            "(workrate {0,number,#.##}); no hottest node available", questionedNodeTotalWorkrate) );
                }
                
                break;
            }
            if (hotNode.equals(questionedNode)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug( MessageFormat.format(
                            "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " +
                            "(workrate {0,number,#.##}); it is also the hottest modfiable node", questionedNodeTotalWorkrate) );
                }
                break;
            }
            
            
            double hotNodeWorkrate = getDataProvider().getTotalWorkrate(hotNode);
            double hotNodeLowThreshold = model.getLowThreshold(hotNode);
            double hotNodeHighThreshold = model.getHighThreshold(hotNode);
            boolean emergencyLoadBalancing = false;  //doesn't apply to cold
            if (hotNodeWorkrate == -1 || hotNodeLowThreshold == -1 || hotNodeHighThreshold == -1) {
                // hotNode presumably has been removed; TODO log
                break;
            }
            if (hotNodeWorkrate <= hotNodeLowThreshold && !emergencyLoadBalancing) {
                //don't balance if all nodes are too low
                
                //for now, stop warning if it is a recurring theme!
//                Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
//                LOG.log(level, MessageFormat.format(
//                        "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
//                        "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low" +
//                        (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
//                        questionedNodeTotalWorkrate, hotNodeWorkrate) );
//                loggedHottestTooLow = true;
                break;
            }
            if (gonnaGrow && (hotNodeWorkrate <= hotNodeHighThreshold && !emergencyLoadBalancing)) {
                //if we're growing the pool, refuse to balance unless the hot node is quite hot
                
                //again, stop warning if it is a recurring theme!
//                Level level = loggedHottestTooLow ? Level.FINER : Level.INFO;
//                LOG.log(level, MessageFormat.format(
//                        "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " +
//                        "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low to accept while pool is growing"+
//                        (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"),
//                        questionedNodeTotalWorkrate, hotNodeWorkrate) );
//                loggedHottestTooLow = true;
                break;
            }
            
            String questionedNodeName = getDataProvider().getName(questionedNode);
            String hotNodeName = getDataProvider().getName(hotNode);
            
            if (LOG.isDebugEnabled()) {
                LOG.debug( MessageFormat.format(
                        "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
                        "("+questionedNode+", workrate {0,number,#.##}), " +
                        "considering source "+hotNodeName+" ("+hotNode+", workrate {1,number,#.##})",
                        questionedNodeTotalWorkrate, hotNodeWorkrate) );
            }
            
            double idealSizeToMove = (hotNodeWorkrate - questionedNodeTotalWorkrate) / 2;
            //if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount
            double targetNodeHighThreshold = model.getHighThreshold(questionedNode);
            if (idealSizeToMove + questionedNodeTotalWorkrate > targetNodeHighThreshold)
                idealSizeToMove = targetNodeHighThreshold - questionedNodeTotalWorkrate;
            double maxSizeToMoveIdeally = Math.min(
                    hotNodeWorkrate/2,
                    //allow to swap order, but not very much (0.9 was allowed when balancing high)
                    (hotNodeWorkrate - questionedNodeTotalWorkrate)*0.6);
            double maxSizeToMoveIfNoSmallButLarger = questionedNodeTotalWorkrate*3/4;
            
            Map<ItemType, Double> hotNodeItems = getDataProvider().getItemWorkrates(hotNode);
            if (hotNodeItems == null) {
                if (LOG.isDebugEnabled())
                    LOG.debug(MessageFormat.format(
                            "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
                            "("+questionedNode+", workrate {0,number,#.##}), " +
                            "excluding hot node "+hotNodeName+" because its item report unavailable",
                            questionedNodeTotalWorkrate));
                nodesChecked.add(hotNode);
                continue;
            }
            
            ItemType itemToMove = findBestItemToMove(hotNodeItems, idealSizeToMove, maxSizeToMoveIdeally,
                    maxSizeToMoveIfNoSmallButLarger, itemsMoved, questionedLocation);
            if (itemToMove == null) {
                if (LOG.isDebugEnabled())
                    LOG.debug(MessageFormat.format(
                            "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " +
                            "("+questionedNode+", workrate {0,number,#.##}), " +
                            "excluding hot node "+hotNodeName+" because it has no appilcable items " +
                            "(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " +
                            "moving from hot node "+hotNodeName+" ("+hotNode+", workrate {3,number,#.##}); available items: {4}",
                            questionedNodeTotalWorkrate, idealSizeToMove, maxSizeToMoveIdeally, hotNodeWorkrate, hotNodeItems) );
                
                nodesChecked.add(hotNode);
                continue;
            }
            
            itemsMoved.add(itemToMove);
            double segmentRate = hotNodeItems.get(itemToMove);
            
//            if (LOG.isLoggable(Level.FINE))
//                LOG.fine( MessageFormat.format(
//                        "policy "+getDataProvider().getAbbr()+" balancing cold node "+questionedNodeName+" " +
//                        "(workrate {0,number,#.##}, too low), transitioning " + itemToMove +
//                        " from "+hotNodeName+" (workrate {1,number,#.##}, now -= {2,number,#.##})",
//                        questionedNodeTotalWorkrate, hotNodeWorkrate, segmentRate) );
            
            questionedNodeTotalWorkrate += segmentRate;
            hotNodeWorkrate -= segmentRate;
            
            moveItem(itemToMove, hotNode, questionedNode);
            
            if (++numMigrations >= getMaxMigrationsPerBalancingNode()) {
                break;
            }
        }
        
        if (LOG.isDebugEnabled()) {
            if (iters == 0) {
                if (LOG.isTraceEnabled())
                    LOG.trace( MessageFormat.format(
                            "policy "+getDataProvider().getName()+" balancing if cold finished at node "+questionedNode+"; " +
                            "workrate {0,number,#.##} not cold",
                            originalQuestionedNodeTotalWorkrate) );
            }
            else if (itemsMoved.isEmpty()) {
                if (LOG.isTraceEnabled())
                    LOG.trace( MessageFormat.format(
                            "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+" " +
                            "(workrate {0,number,#.##}); no way to improve it",
                            originalQuestionedNodeTotalWorkrate) );
            } else {
                LOG.debug( MessageFormat.format(
                        "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+"; " +
                        "workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " +
                        "by moving in {3}",
                        originalQuestionedNodeTotalWorkrate,
                        questionedNodeTotalWorkrate,
                        getDataProvider().getTotalWorkrate(questionedNode),
                        itemsMoved) );
            }
        }
        return !itemsMoved.isEmpty();
    }