in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java [334:397]
public void rebalance() {
entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "explicitly started");
newScript("rebalance")
.body.append(
couchbaseCli("rebalance") + getCouchbaseHostnameAndCredentials())
.failOnNonZeroResultCode()
.execute();
// wait until the re-balance is started
// (if it's quick, this might miss it, but it will only block for 30s if so)
Repeater.create()
.backoff(Repeater.DEFAULT_REAL_QUICK_PERIOD, 2, Duration.millis(500))
.limitTimeTo(Duration.THIRTY_SECONDS)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
if (isNodeRebalancing(nodeHostAndPort.toString())) {
return true;
}
}
return false;
}
}
).run();
entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "waiting for completion");
// Wait until the Couchbase node finishes the re-balancing
Task<Boolean> reBalance = TaskBuilder.<Boolean>builder()
.displayName("Waiting until node is rebalancing")
.body(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return Repeater.create()
.backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
.limitTimeTo(Duration.FIVE_MINUTES)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
if (isNodeRebalancing(nodeHostAndPort.toString())) {
return false;
}
}
return true;
}
})
.run();
}
})
.build();
Boolean completed = DynamicTasks.queueIfPossible(reBalance)
.orSubmitAndBlock()
.andWaitForSuccess();
if (completed) {
entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "completed");
ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), "rebalancing");
log.info("Rebalanced cluster via primary node {}", getEntity());
} else {
entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "timed out");
ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(getEntity(), "rebalancing", "rebalance did not complete within time limit");
log.warn("Timeout rebalancing cluster via primary node {}", getEntity());
}
}