in core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java [598:717]
protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) {
if (emptyFilterNeeded) {
// previous run may have decremented counts
MutableList<Object> nowOkayTags = MutableList.of();
for (Map.Entry<Object,AtomicInteger> entry: taskTagsInCategoryOverCapacity.entrySet()) {
if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey());
}
for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag);
}
if (taskTagsInCategoryOverCapacity.isEmpty())
return 0;
// TODO Skip tasks that will be evicted anyway (transient, expired)
// https://issues.apache.org/jira/browse/BROOKLYN-401
Collection<Task<?>> tasks = executionManager.allTasksLive();
List<Task<?>> tasksToConsiderDeleting = MutableList.of();
try {
for (Task<?> task: tasks) {
if (!task.isDone(true)) continue;
Set<Object> tags = TaskTags.getTagsFast(task);
int categoryTags = 0, tooFullCategoryTags = 0;
for (Object tag: tags) {
if (category.acceptsTag(tag)) {
categoryTags++;
if (taskTagsInCategoryOverCapacity.containsKey(tag))
tooFullCategoryTags++;
}
}
if (tooFullCategoryTags>0) {
if (categoryTags==tooFullCategoryTags) {
// all buckets are full, delete this one
tasksToConsiderDeleting.add(task);
} else {
// if any bucket is under capacity, then give grace to the other buckets in this category
for (Object tag: tags) {
if (category.acceptsTag(tag)) {
AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag);
if (over!=null) {
if (over.decrementAndGet()<=0) {
// and remove it from over-capacity if so
taskTagsInCategoryOverCapacity.remove(tag);
if (taskTagsInCategoryOverCapacity.isEmpty())
return 0;
}
}
}
}
}
}
}
} catch (ConcurrentModificationException e) {
// do CME's happen with these data structures?
// if so, let's just delete what we've found so far
LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e);
}
if (tasksToConsiderDeleting.isEmpty()) {
return 0;
}
Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR);
if (LOG.isDebugEnabled()) {
List<Object> tasksToLog = MutableList.copyOf(tasksToConsiderDeleting);
if (tasksToConsiderDeleting.size()>10) {
tasksToConsiderDeleting.stream().limit(5).forEach(tasksToLog::add);
tasksToLog.add("...");
tasksToConsiderDeleting.stream().skip(tasksToConsiderDeleting.size()-5).forEach(tasksToLog::add);
} else {
tasksToLog.addAll(tasksToConsiderDeleting);
}
LOG.debug("brooklyn-gc detected " + taskTagsInCategoryOverCapacity.size() + " " + category + " "
+ "tag(s) over capacity, expiring old tasks; "
+ tasksToConsiderDeleting.size() + " tasks under consideration; categories are: "
+ taskTagsInCategoryOverCapacity + "; including " + tasksToLog);
}
// now try deleting tasks which are overcapacity for each (non-entity) tag
Set<Task<?>> deleted = MutableSet.of();
for (Task<?> task: tasksToConsiderDeleting) {
boolean delete = false;
for (Object tag: task.getTags()) {
if (!category.acceptsTag(tag)) {
// ignore this tag, not right for the category
continue;
}
if (taskTagsInCategoryOverCapacity.get(tag)==null) {
// no longer over capacity in this tag
delete = false;
break;
}
// has at least one tag in the category, and all such tags are overcapacity
delete = true;
}
if (delete) {
// delete this and update overcapacity info
deleted.add(task);
executionManager.deleteTask(task);
for (Object tag: task.getTags()) {
AtomicInteger counter = taskAllTagsOverCapacity.get(tag);
if (counter!=null && counter.decrementAndGet()<=0)
taskTagsInCategoryOverCapacity.remove(tag);
}
if (LOG.isTraceEnabled())
LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity);
if (taskTagsInCategoryOverCapacity.isEmpty())
break;
}
}
if (LOG.isDebugEnabled())
LOG.debug("brooklyn-gc deleted "+deleted.size()+" tasks in over-capacity " + category+" tag categories; "
+ "capacities now: " + taskTagsInCategoryOverCapacity+"; deleted tasks: "+
deleted.stream().map(Task::getId).collect(Collectors.joining(",")));
return deleted.size();
}