protected void onServerPoolMemberChanged()

in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakClusterImpl.java [160:226]


    protected void onServerPoolMemberChanged(final Entity member) {
        synchronized (mutex) {
            log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() });

            Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
            if (belongsInServerPool(member)) {
                // TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there?
                // TODO and can we do join as part of node starting?

                if (nodes == null) {
                    nodes = Maps.newLinkedHashMap();
                }
                String riakName = getRiakName(member);
                Preconditions.checkNotNull(riakName);

                // flag a first node to be the first node in the riak cluster.
                Boolean firstNode = getAttribute(IS_FIRST_NODE_SET);
                if (!Boolean.TRUE.equals(firstNode)) {
                    sensors().set(IS_FIRST_NODE_SET, Boolean.TRUE);

                    nodes.put(member, riakName);
                    sensors().set(RIAK_CLUSTER_NODES, nodes);

                    ((EntityInternal) member).sensors().set(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);

                    log.info("Added initial Riak node {}: {}; {} to new cluster", new Object[] { this, member, getRiakName(member) });
                } else {
                    // TODO: be wary of erroneous nodes but are still flagged 'in cluster'
                    // add the new node to be part of the riak cluster.
                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
                            Predicates.instanceOf(RiakNode.class),
                            EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true)));
                    if (anyNodeInCluster.isPresent()) {
                        if (!nodes.containsKey(member) && member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER) == null) {
                            String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
                            Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName).blockUntilEnded();
                            nodes.put(member, riakName);
                            sensors().set(RIAK_CLUSTER_NODES, nodes);
                            log.info("Added Riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
                        }
                    } else {
                        log.error("isFirstNodeSet, but no cluster members found to add {}", member.getId());
                    }
                }
            } else {
                if (nodes != null && nodes.containsKey(member)) {
                    DependentConfiguration.attributeWhenReady(member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES);
                    @SuppressWarnings("unchecked")
                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
                            EntityPredicates.isManaged(),
                            Predicates.instanceOf(RiakNode.class),
                            EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true),
                            Predicates.not(Predicates.equalTo(member))));
                    if (anyNodeInCluster.isPresent()) {
                        Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.REMOVE_FROM_CLUSTER, getRiakName(member)).blockUntilEnded();
                    }
                    nodes.remove(member);
                    sensors().set(RIAK_CLUSTER_NODES, nodes);
                    log.info("Removed Riak node {}: {}; {} from cluster", new Object[]{ this, member, getRiakName(member) });
                }
            }

            ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES);

            calculateClusterAddresses();
        }
    }