in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakClusterImpl.java [160:226]
protected void onServerPoolMemberChanged(final Entity member) {
synchronized (mutex) {
log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() });
Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
if (belongsInServerPool(member)) {
// TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there?
// TODO and can we do join as part of node starting?
if (nodes == null) {
nodes = Maps.newLinkedHashMap();
}
String riakName = getRiakName(member);
Preconditions.checkNotNull(riakName);
// flag a first node to be the first node in the riak cluster.
Boolean firstNode = getAttribute(IS_FIRST_NODE_SET);
if (!Boolean.TRUE.equals(firstNode)) {
sensors().set(IS_FIRST_NODE_SET, Boolean.TRUE);
nodes.put(member, riakName);
sensors().set(RIAK_CLUSTER_NODES, nodes);
((EntityInternal) member).sensors().set(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
log.info("Added initial Riak node {}: {}; {} to new cluster", new Object[] { this, member, getRiakName(member) });
} else {
// TODO: be wary of erroneous nodes but are still flagged 'in cluster'
// add the new node to be part of the riak cluster.
Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
Predicates.instanceOf(RiakNode.class),
EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true)));
if (anyNodeInCluster.isPresent()) {
if (!nodes.containsKey(member) && member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER) == null) {
String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName).blockUntilEnded();
nodes.put(member, riakName);
sensors().set(RIAK_CLUSTER_NODES, nodes);
log.info("Added Riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
}
} else {
log.error("isFirstNodeSet, but no cluster members found to add {}", member.getId());
}
}
} else {
if (nodes != null && nodes.containsKey(member)) {
DependentConfiguration.attributeWhenReady(member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES);
@SuppressWarnings("unchecked")
Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
EntityPredicates.isManaged(),
Predicates.instanceOf(RiakNode.class),
EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true),
Predicates.not(Predicates.equalTo(member))));
if (anyNodeInCluster.isPresent()) {
Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.REMOVE_FROM_CLUSTER, getRiakName(member)).blockUntilEnded();
}
nodes.remove(member);
sensors().set(RIAK_CLUSTER_NODES, nodes);
log.info("Removed Riak node {}: {}; {} from cluster", new Object[]{ this, member, getRiakName(member) });
}
}
ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES);
calculateClusterAddresses();
}
}