in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java [125:183]
protected void addShardAsync(final Entity replicaSet) {
final Duration timeout = Duration.minutes(20);
final Stopwatch stopwatch = Stopwatch.createStarted();
final AtomicInteger attempts = new AtomicInteger();
// TODO Don't use executor, use ExecutionManager; but following pattern in MongoDBReplicaSetImpl for now.
executor.submit(new Runnable() {
@Override
public void run() {
boolean reschedule;
MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
if (router == null) {
LOG.debug("Rescheduling adding shard {} because no running router for cluster {}", replicaSet, this);
reschedule = true;
} else {
MongoDBClientSupport client;
try {
client = MongoDBClientSupport.forServer(router);
} catch (UnknownHostException e) {
throw Exceptions.propagate(e);
}
try {
MongoDBServer primary = replicaSet.getAttribute(MongoDBReplicaSet.PRIMARY_ENTITY);
if (primary != null) {
String addr = String.format("%s:%d", primary.getAttribute(MongoDBServer.SUBNET_HOSTNAME), primary.getAttribute(MongoDBServer.PORT));
String replicaSetURL = ((MongoDBReplicaSet) replicaSet).getName() + "/" + addr;
boolean added = client.addShardToRouter(replicaSetURL);
if (added) {
LOG.info("{} added shard {} via {}", new Object[]{MongoDBShardClusterImpl.this, replicaSetURL, router});
addedMembers.add(replicaSet);
reschedule = false;
} else {
LOG.debug("Rescheduling addition of shard {} because add failed via router {}", replicaSetURL, router);
reschedule = true;
}
} else {
LOG.debug("Rescheduling addition of shard {} because primary is null", replicaSet);
reschedule = true;
}
} catch (Exception e) {
LOG.error("Failed to add shard to router {}: ", router, e);
throw Exceptions.propagate(e);
}
}
if (reschedule) {
int numAttempts = attempts.incrementAndGet();
if (numAttempts > 1 && timeout.toMilliseconds() > stopwatch.elapsed(TimeUnit.MILLISECONDS)) {
executor.schedule(this, 3, TimeUnit.SECONDS);
} else {
LOG.warn("Timeout after {} attempts ({}) adding shard {}; aborting",
new Object[] {numAttempts, Time.makeTimeStringRounded(stopwatch), replicaSet});
addingMembers.remove(replicaSet);
}
}
}
});
}