public void rebalance()

in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java [334:397]


    public void rebalance() {
        entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "explicitly started");
        newScript("rebalance")
                .body.append(
                couchbaseCli("rebalance") + getCouchbaseHostnameAndCredentials())
                .failOnNonZeroResultCode()
                .execute();

        // wait until the re-balance is started
        // (if it's quick, this might miss it, but it will only block for 30s if so)
        Repeater.create()
                .backoff(Repeater.DEFAULT_REAL_QUICK_PERIOD, 2, Duration.millis(500))
                .limitTimeTo(Duration.THIRTY_SECONDS)
                .until(new Callable<Boolean>() {
                           @Override
                           public Boolean call() throws Exception {
                               for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
                                   if (isNodeRebalancing(nodeHostAndPort.toString())) {
                                       return true;
                                   }
                               }
                               return false;
                           }
                       }
                ).run();

        entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "waiting for completion");
        // Wait until the Couchbase node finishes the re-balancing
        Task<Boolean> reBalance = TaskBuilder.<Boolean>builder()
                .displayName("Waiting until node is rebalancing")
                .body(new Callable<Boolean>() {
                    @Override
                    public Boolean call() throws Exception {
                        return Repeater.create()
                                .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
                                .limitTimeTo(Duration.FIVE_MINUTES)
                                .until(new Callable<Boolean>() {
                                    @Override
                                    public Boolean call() throws Exception {
                                        for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) {
                                            if (isNodeRebalancing(nodeHostAndPort.toString())) {
                                                return false;
                                            }
                                        }
                                        return true;
                                    }
                                })
                                .run();
                        }
                })
                .build();
        Boolean completed = DynamicTasks.queueIfPossible(reBalance)
                .orSubmitAndBlock()
                .andWaitForSuccess();
        if (completed) {
            entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "completed");
            ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), "rebalancing");
            log.info("Rebalanced cluster via primary node {}", getEntity());
        } else {
            entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "timed out");
            ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(getEntity(), "rebalancing", "rebalance did not complete within time limit");
            log.warn("Timeout rebalancing cluster via primary node {}", getEntity());
        }
    }