in samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseBucketRegistry.java [116:159]
private Cluster openCluster(List<String> clusterNodes, CouchbaseEnvironmentConfigs configs) {
DefaultCouchbaseEnvironment.Builder envBuilder = new DefaultCouchbaseEnvironment.Builder();
if (configs.sslEnabled != null) {
envBuilder.sslEnabled(configs.sslEnabled);
}
if (configs.certAuthEnabled != null) {
envBuilder.certAuthEnabled(configs.certAuthEnabled);
}
if (configs.sslKeystoreFile != null) {
envBuilder.sslKeystoreFile(configs.sslKeystoreFile);
}
if (configs.sslKeystorePassword != null) {
envBuilder.sslKeystorePassword(configs.sslKeystorePassword);
}
if (configs.sslTruststoreFile != null) {
envBuilder.sslTruststoreFile(configs.sslTruststoreFile);
}
if (configs.sslTruststorePassword != null) {
envBuilder.sslTruststorePassword(configs.sslTruststorePassword);
}
if (configs.bootstrapCarrierDirectPort != null) {
envBuilder.bootstrapCarrierDirectPort(configs.bootstrapCarrierDirectPort);
}
if (configs.bootstrapCarrierSslPort != null) {
envBuilder.bootstrapCarrierSslPort(configs.bootstrapCarrierSslPort);
}
if (configs.bootstrapHttpDirectPort != null) {
envBuilder.bootstrapHttpDirectPort(configs.bootstrapHttpDirectPort);
}
if (configs.bootstrapHttpSslPort != null) {
envBuilder.bootstrapHttpSslPort(configs.bootstrapHttpSslPort);
}
CouchbaseEnvironment env = envBuilder.build();
Cluster cluster = CouchbaseCluster.create(env, clusterNodes);
if (configs.sslEnabled != null && configs.sslEnabled) {
cluster.authenticate(CertAuthenticator.INSTANCE);
} else if (configs.username != null) {
cluster.authenticate(configs.username, configs.password);
} else {
LOGGER.warn("No authentication is enabled for cluster: {}. This is not recommended except for test cases.",
clusterNodes);
}
return cluster;
}