protected void doStart()

in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java [195:274]


    protected void doStart() {
        sensors().set(IS_CLUSTER_INITIALIZED, false);
        
        super.doStart();

        connectSensors();
        
        sensors().set(BUCKET_CREATION_IN_PROGRESS, false);

        //start timeout before adding the servers
        Tasks.setBlockingDetails("Pausing while Couchbase stabilizes");
        Time.sleep(getConfig(NODES_STARTED_STABILIZATION_DELAY));

        Optional<Set<Entity>> upNodes = Optional.<Set<Entity>>fromNullable(getAttribute(COUCHBASE_CLUSTER_UP_NODES));
        if (upNodes.isPresent() && !upNodes.get().isEmpty()) {

            Tasks.setBlockingDetails("Adding servers to Couchbase");
            
            //TODO: select a new primary node if this one fails
            Entity primaryNode = upNodes.get().iterator().next();
            ((EntityInternal) primaryNode).sensors().set(CouchbaseNode.IS_PRIMARY_NODE, true);
            sensors().set(COUCHBASE_PRIMARY_NODE, primaryNode);

            Set<Entity> serversToAdd = MutableSet.<Entity>copyOf(getUpNodes());

            if (serversToAdd.size() >= getQuorumSize() && serversToAdd.size() > 1) {
                log.info("Number of SERVICE_UP nodes:{} in cluster:{} reached Quorum:{}, adding the servers", new Object[]{serversToAdd.size(), getId(), getQuorumSize()});
                addServers(serversToAdd);

                //wait for servers to be added to the couchbase server
                try {
                    Tasks.setBlockingDetails("Delaying before advertising cluster up");
                    Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
                } finally {
                    Tasks.resetBlockingDetails();
                }
                
                getPrimaryNode().rebalance();
            } else {
                if (getQuorumSize()>1) {
                    log.warn(this+" is not quorate; will likely fail later, but proceeding for now");
                }
                for (Entity server: serversToAdd) {
                    ((EntityInternal) server).sensors().set(CouchbaseNode.IS_IN_CLUSTER, true);
                }
            }
                
            if (getConfig(CREATE_BUCKETS)!=null) {
                try {
                    Tasks.setBlockingDetails("Creating buckets in Couchbase");

                    createBuckets();
                    DependentConfiguration.waitInTaskForAttributeReady(this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));

                } finally {
                    Tasks.resetBlockingDetails();
                }
            }

            if (getConfig(REPLICATION)!=null) {
                try {
                    Tasks.setBlockingDetails("Configuring replication rules");

                    List<Map<String, Object>> replRules = getConfig(REPLICATION);
                    for (Map<String, Object> replRule: replRules) {
                        DynamicTasks.queue(Effectors.invocation(getPrimaryNode(), CouchbaseNode.ADD_REPLICATION_RULE, replRule));
                    }
                    DynamicTasks.waitForLast();

                } finally {
                    Tasks.resetBlockingDetails();
                }
            }

            sensors().set(IS_CLUSTER_INITIALIZED, true);
            
        } else {
            throw new IllegalStateException("No up nodes available after starting");
        }
    }