protected int expireOverCapacityTagsInCategory()

in core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java [598:717]


    protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) {
        if (emptyFilterNeeded) {
            // previous run may have decremented counts  
            MutableList<Object> nowOkayTags = MutableList.of(); 
            for (Map.Entry<Object,AtomicInteger> entry: taskTagsInCategoryOverCapacity.entrySet()) {
                if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey());
            }
            for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag);
        }
        
        if (taskTagsInCategoryOverCapacity.isEmpty())
            return 0;
        
        // TODO Skip tasks that will be evicted anyway (transient, expired)
        // https://issues.apache.org/jira/browse/BROOKLYN-401
        Collection<Task<?>> tasks = executionManager.allTasksLive();
        List<Task<?>> tasksToConsiderDeleting = MutableList.of();
        try {
            for (Task<?> task: tasks) {
                if (!task.isDone(true)) continue;
                
                Set<Object> tags = TaskTags.getTagsFast(task);

                int categoryTags = 0, tooFullCategoryTags = 0;
                for (Object tag: tags) {
                    if (category.acceptsTag(tag)) {
                        categoryTags++;
                        if (taskTagsInCategoryOverCapacity.containsKey(tag))
                            tooFullCategoryTags++;
                    }
                }
                if (tooFullCategoryTags>0) {
                    if (categoryTags==tooFullCategoryTags) {
                        // all buckets are full, delete this one
                        tasksToConsiderDeleting.add(task);
                    } else {
                        // if any bucket is under capacity, then give grace to the other buckets in this category
                        for (Object tag: tags) {
                            if (category.acceptsTag(tag)) {
                                AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag);
                                if (over!=null) {
                                    if (over.decrementAndGet()<=0) {
                                        // and remove it from over-capacity if so
                                        taskTagsInCategoryOverCapacity.remove(tag);
                                        if (taskTagsInCategoryOverCapacity.isEmpty())
                                            return 0;
                                    }
                                }
                            }
                        }
                    }
                }
            }

        } catch (ConcurrentModificationException e) {
            // do CME's happen with these data structures?
            // if so, let's just delete what we've found so far
            LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e);
        }

        if (tasksToConsiderDeleting.isEmpty()) {
            return 0;
        }

        Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR);

        if (LOG.isDebugEnabled()) {
            List<Object> tasksToLog = MutableList.copyOf(tasksToConsiderDeleting);
            if (tasksToConsiderDeleting.size()>10) {
                tasksToConsiderDeleting.stream().limit(5).forEach(tasksToLog::add);
                tasksToLog.add("...");
                tasksToConsiderDeleting.stream().skip(tasksToConsiderDeleting.size()-5).forEach(tasksToLog::add);
            } else {
                tasksToLog.addAll(tasksToConsiderDeleting);
            }
            LOG.debug("brooklyn-gc detected " + taskTagsInCategoryOverCapacity.size() + " " + category + " "
                    + "tag(s) over capacity, expiring old tasks; "
                    + tasksToConsiderDeleting.size() + " tasks under consideration; categories are: "
                    + taskTagsInCategoryOverCapacity + "; including " + tasksToLog);
        }

        // now try deleting tasks which are overcapacity for each (non-entity) tag
        Set<Task<?>> deleted = MutableSet.of();
        for (Task<?> task: tasksToConsiderDeleting) {
            boolean delete = false;
            for (Object tag: task.getTags()) {
                if (!category.acceptsTag(tag)) {
                    // ignore this tag, not right for the category
                    continue;
                }
                if (taskTagsInCategoryOverCapacity.get(tag)==null) {
                    // no longer over capacity in this tag
                    delete = false;
                    break;
                }
                // has at least one tag in the category, and all such tags are overcapacity
                delete = true;
            }
            if (delete) {
                // delete this and update overcapacity info
                deleted.add(task);
                executionManager.deleteTask(task);
                for (Object tag: task.getTags()) {
                    AtomicInteger counter = taskAllTagsOverCapacity.get(tag);
                    if (counter!=null && counter.decrementAndGet()<=0)
                        taskTagsInCategoryOverCapacity.remove(tag);
                }
                if (LOG.isTraceEnabled())
                    LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity);
                if (taskTagsInCategoryOverCapacity.isEmpty())
                    break;
            }
        }

        if (LOG.isDebugEnabled())
            LOG.debug("brooklyn-gc deleted "+deleted.size()+" tasks in over-capacity " + category+" tag categories; "
                    + "capacities now: " + taskTagsInCategoryOverCapacity+"; deleted tasks: "+
                    deleted.stream().map(Task::getId).collect(Collectors.joining(",")));
        return deleted.size();
    }