in core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java [224:329]
public void distributeEntities() {
try {
synchronized (memberChangeMutex) {
if (Entities.isUnmanagingOrNoLongerManaged(this)) return;
Function<Entity, String> bucketFunctionF = getConfig(BUCKET_FUNCTION);
CustomWorkflowStep bucketFunctionW = getConfig(BUCKET_WORKFLOW);
String bucketFunctionE = getConfig(BUCKET_EXPRESSION);
if (bucketFunctionE != null) {
if (bucketFunctionW != null) LOG.warn("Ignoring bucket workflow because expression supplied");
bucketFunctionW = TypeCoercions.coerce(MutableMap.of("steps", MutableList.of("return " + bucketFunctionE)), CustomWorkflowStep.class);
}
if (bucketFunctionW != null) {
if (bucketFunctionF != null)
LOG.warn("Ignoring bucket function because workflow or expression supplied");
bucketFunctionF = new WorkflowFunction(bucketFunctionW);
}
Function<Entity, String> bucketIdFunctionF = getConfig(BUCKET_ID_FUNCTION);
CustomWorkflowStep bucketIdFunctionW = getConfig(BUCKET_ID_WORKFLOW);
String bucketIdFunctionE = getConfig(BUCKET_ID_EXPRESSION);
if (bucketIdFunctionE != null) {
if (bucketIdFunctionW != null) LOG.warn("Ignoring bucket workflow because expression supplied");
bucketIdFunctionW = TypeCoercions.coerce(MutableMap.of("steps", MutableList.of("return " + bucketIdFunctionE)), CustomWorkflowStep.class);
}
if (bucketIdFunctionW != null) {
if (bucketIdFunctionF != null)
LOG.warn("Ignoring bucket function because workflow or expression supplied");
bucketIdFunctionF = new WorkflowFunction(bucketIdFunctionW);
}
if (bucketFunctionF == null) {
if (bucketIdFunctionF!=null) {
bucketFunctionF = bucketIdFunctionF;
} else {
LOG.warn(this + " should have exactly one of: a bucket expression, workflow, or function (optionally coming from the bucket ID function)");
return;
}
}
EntitySpec<? extends BasicGroup> bucketSpec = getConfig(BUCKET_SPEC);
if (bucketSpec == null) return;
Map<String, BasicGroup> buckets = MutableMap.copyOf(getAttribute(BUCKETS));
// Bucketize the members where the function gives a non-null bucket
Function<Entity, String> bucketFunctionF2 = bucketFunctionF;
Multimap<String, Entity> entityMapping = getMembers().stream().collect(() -> Multimaps.newSetMultimap(MutableMap.of(), MutableSet::new),
(map, entity) -> {
String name = bucketFunctionF2.apply(entity);
if (Strings.isNonBlank(name)) map.put(name, entity);
},
(m1, m2) -> m1.putAll(m2));
// Now fill the buckets
Collection<Entity> oldChildren = getChildren();
for (String name : entityMapping.keySet()) {
BasicGroup bucket = buckets.get(name);
if (bucket == null) {
try {
EntitySpec<? extends BasicGroup> spec = EntitySpec.create(bucketSpec).displayName(name);
if (bucketIdFunctionF != null) {
spec.configure(BrooklynConfigKeys.PLAN_ID, bucketIdFunctionF.apply(entityMapping.get(name).iterator().next()));
}
bucket = addChild(spec);
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
ServiceProblemsLogic.updateProblemsIndicator(this, "children", "Could not add child; removing all new children for now: " + Exceptions.collapseText(e));
// if we don't do this, they get added infinitely often
MutableSet<Entity> newChildren = MutableSet.copyOf(getChildren());
newChildren.removeAll(oldChildren);
for (Entity child : newChildren) {
removeChild(child);
}
throw e;
}
ServiceProblemsLogic.clearProblemsIndicator(this, "children");
buckets.put(name, bucket);
}
bucket.setMembers(entityMapping.get(name));
}
// Remove any now-empty buckets
Set<String> empty = ImmutableSet.copyOf(Sets.difference(buckets.keySet(), entityMapping.keySet()));
for (String name : empty) {
Group removed = buckets.remove(name);
LOG.debug(this + " removing empty child-bucket " + name + " -> " + removed);
removeChild(removed);
Entities.unmanage(removed);
}
// Save the bucket mappings
sensors().set(BUCKETS, ImmutableMap.copyOf(buckets));
}
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
if (Entities.isUnmanagingOrNoLongerManaged(this)) {
LOG.debug("Error in "+this+" when unmanaged, ignoring: "+e);
} else {
throw Exceptions.propagate(e);
}
}
}