in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java [195:274]
protected void doStart() {
sensors().set(IS_CLUSTER_INITIALIZED, false);
super.doStart();
connectSensors();
sensors().set(BUCKET_CREATION_IN_PROGRESS, false);
//start timeout before adding the servers
Tasks.setBlockingDetails("Pausing while Couchbase stabilizes");
Time.sleep(getConfig(NODES_STARTED_STABILIZATION_DELAY));
Optional<Set<Entity>> upNodes = Optional.<Set<Entity>>fromNullable(getAttribute(COUCHBASE_CLUSTER_UP_NODES));
if (upNodes.isPresent() && !upNodes.get().isEmpty()) {
Tasks.setBlockingDetails("Adding servers to Couchbase");
//TODO: select a new primary node if this one fails
Entity primaryNode = upNodes.get().iterator().next();
((EntityInternal) primaryNode).sensors().set(CouchbaseNode.IS_PRIMARY_NODE, true);
sensors().set(COUCHBASE_PRIMARY_NODE, primaryNode);
Set<Entity> serversToAdd = MutableSet.<Entity>copyOf(getUpNodes());
if (serversToAdd.size() >= getQuorumSize() && serversToAdd.size() > 1) {
log.info("Number of SERVICE_UP nodes:{} in cluster:{} reached Quorum:{}, adding the servers", new Object[]{serversToAdd.size(), getId(), getQuorumSize()});
addServers(serversToAdd);
//wait for servers to be added to the couchbase server
try {
Tasks.setBlockingDetails("Delaying before advertising cluster up");
Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
} finally {
Tasks.resetBlockingDetails();
}
getPrimaryNode().rebalance();
} else {
if (getQuorumSize()>1) {
log.warn(this+" is not quorate; will likely fail later, but proceeding for now");
}
for (Entity server: serversToAdd) {
((EntityInternal) server).sensors().set(CouchbaseNode.IS_IN_CLUSTER, true);
}
}
if (getConfig(CREATE_BUCKETS)!=null) {
try {
Tasks.setBlockingDetails("Creating buckets in Couchbase");
createBuckets();
DependentConfiguration.waitInTaskForAttributeReady(this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
} finally {
Tasks.resetBlockingDetails();
}
}
if (getConfig(REPLICATION)!=null) {
try {
Tasks.setBlockingDetails("Configuring replication rules");
List<Map<String, Object>> replRules = getConfig(REPLICATION);
for (Map<String, Object> replRule: replRules) {
DynamicTasks.queue(Effectors.invocation(getPrimaryNode(), CouchbaseNode.ADD_REPLICATION_RULE, replRule));
}
DynamicTasks.waitForLast();
} finally {
Tasks.resetBlockingDetails();
}
}
sensors().set(IS_CLUSTER_INITIALIZED, true);
} else {
throw new IllegalStateException("No up nodes available after starting");
}
}