in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java [531:591]
public void createBucket(final Entity primaryNode, final String bucketName, final String bucketType, final Integer bucketPort, final Integer bucketRamSize, final Integer bucketReplica) {
DynamicTasks.queueIfPossible(TaskBuilder.<Void>builder().displayName("Creating bucket " + bucketName).body(
new Callable<Void>() {
@Override
public Void call() throws Exception {
DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
}
sensors().set(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true);
HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(primaryNode, primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));
CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder()
.entity(CouchbaseClusterImpl.this)
.period(500, TimeUnit.MILLISECONDS)
.baseUri(String.format("http://%s/pools/default/buckets/%s", hostAndPort, bucketName))
.credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
.poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS)
.onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() {
@Override
public Boolean apply(JsonElement input) {
// Wait until bucket has been created on all nodes and the couchApiBase element has been published (indicating that the bucket is useable)
JsonArray servers = input.getAsJsonArray();
if (servers.size() != CouchbaseClusterImpl.this.getMembers().size()) {
return true;
}
for (JsonElement server : servers) {
Object api = server.getAsJsonObject().get("couchApiBase");
if (api == null || Strings.isEmpty(String.valueOf(api))) {
return true;
}
}
return false;
}
}))
.onFailureOrException(new Function<Object, Boolean>() {
@Override
public Boolean apply(Object input) {
if (input instanceof HttpToolResponse) {
if (((HttpToolResponse) input).getResponseCode() == 404) {
return true;
}
}
if (input instanceof Throwable)
Exceptions.propagate((Throwable) input);
throw new IllegalStateException("Unexpected response when creating bucket:" + input);
}
}))
.build());
// TODO: Bail out if bucket creation fails, to allow next bucket to proceed
Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
}
return null;
}
}
).build()).orSubmitAndBlock();
}