public void createBucket()

in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java [531:591]


    public void createBucket(final Entity primaryNode, final String bucketName, final String bucketType, final Integer bucketPort, final Integer bucketRamSize, final Integer bucketReplica) {
        DynamicTasks.queueIfPossible(TaskBuilder.<Void>builder().displayName("Creating bucket " + bucketName).body(
                new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
                        if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
                            CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
                        }
                        sensors().set(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true);
                        HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(primaryNode, primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT));

                        CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder()
                                .entity(CouchbaseClusterImpl.this)
                                .period(500, TimeUnit.MILLISECONDS)
                                .baseUri(String.format("http://%s/pools/default/buckets/%s", hostAndPort, bucketName))
                                .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD))
                                .poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS)
                                        .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() {
                                            @Override
                                            public Boolean apply(JsonElement input) {
                                                // Wait until bucket has been created on all nodes and the couchApiBase element has been published (indicating that the bucket is useable)
                                                JsonArray servers = input.getAsJsonArray();
                                                if (servers.size() != CouchbaseClusterImpl.this.getMembers().size()) {
                                                    return true;
                                                }
                                                for (JsonElement server : servers) {
                                                    Object api = server.getAsJsonObject().get("couchApiBase");
                                                    if (api == null || Strings.isEmpty(String.valueOf(api))) {
                                                        return true;
                                                    }
                                                }
                                                return false;
                                            }
                                        }))
                                        .onFailureOrException(new Function<Object, Boolean>() {
                                            @Override
                                            public Boolean apply(Object input) {
                                                if (input instanceof HttpToolResponse) {
                                                    if (((HttpToolResponse) input).getResponseCode() == 404) {
                                                        return true;
                                                    }
                                                }
                                                if (input instanceof Throwable)
                                                    Exceptions.propagate((Throwable) input);
                                                throw new IllegalStateException("Unexpected response when creating bucket:" + input);
                                            }
                                        }))
                                .build());

                        // TODO: Bail out if bucket creation fails, to allow next bucket to proceed
                        Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica);
                        DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false));
                        if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) {
                            CouchbaseClusterImpl.this.resetBucketCreation.get().stop();
                        }
                        return null;
                    }
                }
        ).build()).orSubmitAndBlock();
    }