public void rebalance()

in brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunStrategy.java [56:150]


    public void rebalance() {
        try {
            Set<ItemType> items = model.getItems();
            Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = model.getDirectSendsToItemByLocation();
            
            for (ItemType item : items) {
                String itemName = model.getName(item);
                Location activeLocation = model.getItemLocation(item);
                ContainerType activeContainer = model.getItemContainer(item);
                Map<Location, Double> sendsByLocation = directSendsToItemByLocation.get(item);
                if (sendsByLocation == null) sendsByLocation = Collections.emptyMap();
                
                if (parameters.excludedLocations.contains(activeLocation)) {
                    if (LOG.isTraceEnabled()) LOG.trace("Ignoring segment {} as it is in {}", itemName, activeLocation);
                    continue;
                }
                if (!model.isItemMoveable(item)) {
                    if (LOG.isDebugEnabled()) LOG.debug("POLICY {} skipping any migration of {}, it is not moveable", name, itemName);
                    continue;
                }
                if (model.hasActiveMigration(item)) {
                    LOG.info("POLICY {} skipping any migration of {}, it is involved in an active migration already", name, itemName);
                    continue;
                }
                
                double total = DefaultFollowTheSunModel.sum(sendsByLocation.values());

                if (LOG.isTraceEnabled()) LOG.trace("POLICY {} detected {} msgs/sec in {}, split up as: {}", new Object[] {name, total, itemName, sendsByLocation});
                
                Double current = sendsByLocation.get(activeLocation);
                if (current == null) current=0d;
                List<WeightedObject<Location>> locationsWtd = new ArrayList<WeightedObject<Location>>();
                if (total > 0) {
                    for (Map.Entry<Location, Double> entry : sendsByLocation.entrySet()) {
                        Location l = entry.getKey();
                        Double d = entry.getValue();
                        if (d > current) locationsWtd.add(new WeightedObject<Location>(l, d));
                    }
                }
                Collections.sort(locationsWtd);
                Collections.reverse(locationsWtd);
                
                double highestMsgRate = -1;
                Location highestLocation = null;
                ContainerType optimalContainerInHighest = null;
                while (!locationsWtd.isEmpty()) {
                    WeightedObject<Location> weightedObject = locationsWtd.remove(0);
                    highestMsgRate = weightedObject.getWeight();
                    highestLocation = weightedObject.getObject();
                    optimalContainerInHighest = findOptimal(model.getAvailableContainersFor(item, highestLocation));
                    if (optimalContainerInHighest != null) {
                        break;
                    }
                }
                if (optimalContainerInHighest == null) {
                    if (LOG.isDebugEnabled()) LOG.debug("POLICY {} detected {} is already in optimal permitted location ({} of {} msgs/sec)", new Object[] {name, itemName, highestMsgRate, total});
                    continue;                   
                }
                
                double nextHighestMsgRate = -1;
                ContainerType optimalContainerInNextHighest = null;
                while (!locationsWtd.isEmpty()) {
                    WeightedObject<Location> weightedObject = locationsWtd.remove(0);
                    nextHighestMsgRate = weightedObject.getWeight();
                    Location nextHighestLocation = weightedObject.getObject();
                    optimalContainerInNextHighest = findOptimal(model.getAvailableContainersFor(item, nextHighestLocation));
                    if (optimalContainerInNextHighest != null) {
                        break;
                    }
                }
                if (optimalContainerInNextHighest == null) {
                    nextHighestMsgRate = current;
                }
                
                if (parameters.isTriggered(highestMsgRate, total, nextHighestMsgRate, current)) {
                    LOG.info("POLICY "+name+" detected "+itemName+" should be in location "+highestLocation+" on "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec), migrating");
                    try {
                        if (activeContainer.equals(optimalContainerInHighest)) {
                            //shouldn't happen
                            LOG.warn("POLICY "+name+" detected "+itemName+" should move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec) but it is already there with "+current+" msgs/sec");
                        } else {
                            item.move(optimalContainerInHighest);
                            model.onItemMoved(item, optimalContainerInHighest);
                        }
                    } catch (Exception e) {
                        LOG.warn("POLICY "+name+" detected "+itemName+" should be on "+optimalContainerInHighest+", but can't move it: "+e, e);
                    }
                } else {
                    if (LOG.isTraceEnabled()) LOG.trace("POLICY "+name+" detected "+itemName+" need not move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec not much better than "+current+" at "+activeContainer+")");
                }
            }
        } catch (Exception e) {
            LOG.warn("Error in policy "+name+" (ignoring): "+e, e);
        }
    }