in brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java [489:587]
protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) {
if (emptyFilterNeeded) {
// previous run may have decremented counts
MutableList<Object> nowOkayTags = MutableList.of();
for (Map.Entry<Object,AtomicInteger> entry: taskTagsInCategoryOverCapacity.entrySet()) {
if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey());
}
for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag);
}
if (taskTagsInCategoryOverCapacity.isEmpty())
return 0;
Collection<Task<?>> tasks = executionManager.allTasksLive();
List<Task<?>> tasksToConsiderDeleting = MutableList.of();
try {
for (Task<?> task: tasks) {
if (!task.isDone()) continue;
Set<Object> tags = task.getTags();
int categoryTags = 0, tooFullCategoryTags = 0;
for (Object tag: tags) {
if (category.acceptsTag(tag)) {
categoryTags++;
if (taskTagsInCategoryOverCapacity.containsKey(tag))
tooFullCategoryTags++;
}
}
if (tooFullCategoryTags>0) {
if (categoryTags==tooFullCategoryTags) {
// all buckets are full, delete this one
tasksToConsiderDeleting.add(task);
} else {
// if any bucket is under capacity, then give grace to the other buckets in this category
for (Object tag: tags) {
if (category.acceptsTag(tag)) {
AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag);
if (over!=null) {
if (over.decrementAndGet()<=0) {
// and remove it from over-capacity if so
taskTagsInCategoryOverCapacity.remove(tag);
if (taskTagsInCategoryOverCapacity.isEmpty())
return 0;
}
}
}
}
}
}
}
} catch (ConcurrentModificationException e) {
// do CME's happen with these data structures?
// if so, let's just delete what we've found so far
LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e);
}
if (LOG.isDebugEnabled())
LOG.debug("brooklyn-gc detected "+taskTagsInCategoryOverCapacity.size()+" "+category+" "
+ "tags over capacity, expiring old tasks; "
+ tasksToConsiderDeleting.size()+" tasks under consideration; categories are: "
+ taskTagsInCategoryOverCapacity);
Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR);
// now try deleting tasks which are overcapacity for each (non-entity) tag
int deleted = 0;
for (Task<?> task: tasksToConsiderDeleting) {
boolean delete = true;
for (Object tag: task.getTags()) {
if (!category.acceptsTag(tag))
continue;
if (taskTagsInCategoryOverCapacity.get(tag)==null) {
// no longer over capacity in this tag
delete = false;
break;
}
}
if (delete) {
// delete this and update overcapacity info
deleted++;
executionManager.deleteTask(task);
for (Object tag: task.getTags()) {
AtomicInteger counter = taskAllTagsOverCapacity.get(tag);
if (counter!=null && counter.decrementAndGet()<=0)
taskTagsInCategoryOverCapacity.remove(tag);
}
if (LOG.isTraceEnabled())
LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity);
if (taskTagsInCategoryOverCapacity.isEmpty())
break;
}
}
if (LOG.isDebugEnabled())
LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; "
+ "capacities now: " + taskTagsInCategoryOverCapacity);
return deleted;
}