public void distributeEntities()

in core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java [224:329]


    public void distributeEntities() {
        try {
            synchronized (memberChangeMutex) {
                if (Entities.isUnmanagingOrNoLongerManaged(this)) return;

                Function<Entity, String> bucketFunctionF = getConfig(BUCKET_FUNCTION);
                CustomWorkflowStep bucketFunctionW = getConfig(BUCKET_WORKFLOW);
                String bucketFunctionE = getConfig(BUCKET_EXPRESSION);

                if (bucketFunctionE != null) {
                    if (bucketFunctionW != null) LOG.warn("Ignoring bucket workflow because expression supplied");
                    bucketFunctionW = TypeCoercions.coerce(MutableMap.of("steps", MutableList.of("return " + bucketFunctionE)), CustomWorkflowStep.class);
                }
                if (bucketFunctionW != null) {
                    if (bucketFunctionF != null)
                        LOG.warn("Ignoring bucket function because workflow or expression supplied");
                    bucketFunctionF = new WorkflowFunction(bucketFunctionW);
                }

                Function<Entity, String> bucketIdFunctionF = getConfig(BUCKET_ID_FUNCTION);
                CustomWorkflowStep bucketIdFunctionW = getConfig(BUCKET_ID_WORKFLOW);
                String bucketIdFunctionE = getConfig(BUCKET_ID_EXPRESSION);

                if (bucketIdFunctionE != null) {
                    if (bucketIdFunctionW != null) LOG.warn("Ignoring bucket workflow because expression supplied");
                    bucketIdFunctionW = TypeCoercions.coerce(MutableMap.of("steps", MutableList.of("return " + bucketIdFunctionE)), CustomWorkflowStep.class);
                }
                if (bucketIdFunctionW != null) {
                    if (bucketIdFunctionF != null)
                        LOG.warn("Ignoring bucket function because workflow or expression supplied");
                    bucketIdFunctionF = new WorkflowFunction(bucketIdFunctionW);
                }

                if (bucketFunctionF == null) {
                    if (bucketIdFunctionF!=null) {
                        bucketFunctionF = bucketIdFunctionF;
                    } else {
                        LOG.warn(this + " should have exactly one of: a bucket expression, workflow, or function (optionally coming from the bucket ID function)");
                        return;
                    }
                }

                EntitySpec<? extends BasicGroup> bucketSpec = getConfig(BUCKET_SPEC);
                if (bucketSpec == null) return;

                Map<String, BasicGroup> buckets = MutableMap.copyOf(getAttribute(BUCKETS));

                // Bucketize the members where the function gives a non-null bucket
                Function<Entity, String> bucketFunctionF2 = bucketFunctionF;
                Multimap<String, Entity> entityMapping = getMembers().stream().collect(() -> Multimaps.newSetMultimap(MutableMap.of(), MutableSet::new),
                        (map, entity) -> {
                            String name = bucketFunctionF2.apply(entity);
                            if (Strings.isNonBlank(name)) map.put(name, entity);
                        },
                        (m1, m2) -> m1.putAll(m2));

                // Now fill the buckets
                Collection<Entity> oldChildren = getChildren();
                for (String name : entityMapping.keySet()) {
                    BasicGroup bucket = buckets.get(name);
                    if (bucket == null) {
                        try {
                            EntitySpec<? extends BasicGroup> spec = EntitySpec.create(bucketSpec).displayName(name);
                            if (bucketIdFunctionF != null) {
                                spec.configure(BrooklynConfigKeys.PLAN_ID, bucketIdFunctionF.apply(entityMapping.get(name).iterator().next()));
                            }

                            bucket = addChild(spec);
                        } catch (Exception e) {
                            Exceptions.propagateIfFatal(e);
                            ServiceProblemsLogic.updateProblemsIndicator(this, "children", "Could not add child; removing all new children for now: " + Exceptions.collapseText(e));
                            // if we don't do this, they get added infinitely often
                            MutableSet<Entity> newChildren = MutableSet.copyOf(getChildren());
                            newChildren.removeAll(oldChildren);
                            for (Entity child : newChildren) {
                                removeChild(child);
                            }
                            throw e;
                        }
                        ServiceProblemsLogic.clearProblemsIndicator(this, "children");
                        buckets.put(name, bucket);
                    }
                    bucket.setMembers(entityMapping.get(name));
                }

                // Remove any now-empty buckets
                Set<String> empty = ImmutableSet.copyOf(Sets.difference(buckets.keySet(), entityMapping.keySet()));
                for (String name : empty) {
                    Group removed = buckets.remove(name);
                    LOG.debug(this + " removing empty child-bucket " + name + " -> " + removed);
                    removeChild(removed);
                    Entities.unmanage(removed);
                }

                // Save the bucket mappings
                sensors().set(BUCKETS, ImmutableMap.copyOf(buckets));
            }
        } catch (Exception e) {
            Exceptions.propagateIfFatal(e);
            if (Entities.isUnmanagingOrNoLongerManaged(this)) {
                LOG.debug("Error in "+this+" when unmanaged, ignoring: "+e);
            } else {
                throw Exceptions.propagate(e);
            }
        }
    }