protected int expireOverCapacityTagsInCategory()

in brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java [489:587]


    protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) {
        if (emptyFilterNeeded) {
            // previous run may have decremented counts  
            MutableList<Object> nowOkayTags = MutableList.of(); 
            for (Map.Entry<Object,AtomicInteger> entry: taskTagsInCategoryOverCapacity.entrySet()) {
                if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey());
            }
            for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag);
        }
        
        if (taskTagsInCategoryOverCapacity.isEmpty())
            return 0;
        
        Collection<Task<?>> tasks = executionManager.allTasksLive();
        List<Task<?>> tasksToConsiderDeleting = MutableList.of();
        try {
            for (Task<?> task: tasks) {
                if (!task.isDone()) continue;
                
                Set<Object> tags = task.getTags();

                int categoryTags = 0, tooFullCategoryTags = 0;
                for (Object tag: tags) {
                    if (category.acceptsTag(tag)) {
                        categoryTags++;
                        if (taskTagsInCategoryOverCapacity.containsKey(tag))
                            tooFullCategoryTags++;
                    }
                }
                if (tooFullCategoryTags>0) {
                    if (categoryTags==tooFullCategoryTags) {
                        // all buckets are full, delete this one
                        tasksToConsiderDeleting.add(task);
                    } else {
                        // if any bucket is under capacity, then give grace to the other buckets in this category
                        for (Object tag: tags) {
                            if (category.acceptsTag(tag)) {
                                AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag);
                                if (over!=null) {
                                    if (over.decrementAndGet()<=0) {
                                        // and remove it from over-capacity if so
                                        taskTagsInCategoryOverCapacity.remove(tag);
                                        if (taskTagsInCategoryOverCapacity.isEmpty())
                                            return 0;
                                    }
                                }
                            }
                        }
                    }
                }
            }

        } catch (ConcurrentModificationException e) {
            // do CME's happen with these data structures?
            // if so, let's just delete what we've found so far
            LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e);
        }

        if (LOG.isDebugEnabled())
            LOG.debug("brooklyn-gc detected "+taskTagsInCategoryOverCapacity.size()+" "+category+" "
                    + "tags over capacity, expiring old tasks; "
                    + tasksToConsiderDeleting.size()+" tasks under consideration; categories are: "
                    + taskTagsInCategoryOverCapacity);

        Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR);
        // now try deleting tasks which are overcapacity for each (non-entity) tag
        int deleted = 0;
        for (Task<?> task: tasksToConsiderDeleting) {
            boolean delete = true;
            for (Object tag: task.getTags()) {
                if (!category.acceptsTag(tag))
                    continue;
                if (taskTagsInCategoryOverCapacity.get(tag)==null) {
                    // no longer over capacity in this tag
                    delete = false;
                    break;
                }
            }
            if (delete) {
                // delete this and update overcapacity info
                deleted++;
                executionManager.deleteTask(task);
                for (Object tag: task.getTags()) {
                    AtomicInteger counter = taskAllTagsOverCapacity.get(tag);
                    if (counter!=null && counter.decrementAndGet()<=0)
                        taskTagsInCategoryOverCapacity.remove(tag);
                }
                if (LOG.isTraceEnabled())
                    LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity);
                if (taskTagsInCategoryOverCapacity.isEmpty())
                    break;
            }
        }

        if (LOG.isDebugEnabled())
            LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; "
                    + "capacities now: " + taskTagsInCategoryOverCapacity);
        return deleted;
    }