in streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java [61:142]
public void start() throws Exception {
Objects.nonNull(config);
LOGGER.info("CassandraClient.start {}", config);
Cluster.Builder builder = Cluster.builder()
.withPort(config.getPort().intValue())
.withoutJMXReporting()
.withoutMetrics()
.withSocketOptions(
new SocketOptions()
.setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS*10)
.setReadTimeoutMillis(DEFAULT_READ_TIMEOUT_MILLIS*10)
);
if( config.getSsl() != null && config.getSsl().getEnabled() == true) {
Ssl ssl = config.getSsl();
KeyStore ks = KeyStore.getInstance("JKS");
InputStream trustStore = new FileInputStream(ssl.getTrustStore());
ks.load(trustStore, ssl.getTrustStorePassword().toCharArray());
InputStream keyStore = new FileInputStream(ssl.getKeyStore());
ks.load(keyStore, ssl.getKeyStorePassword().toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, ssl.getKeyStorePassword().toCharArray());
SSLContext sslContext = SSLContext.getInstance("SSLv3");
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
SSLOptions sslOptions = JdkSSLOptions.builder()
.withSSLContext(sslContext)
.build();
builder = builder.withSSL(sslOptions);
}
Collection<InetSocketAddress> addresses = new ArrayList<>();
for (String h : config.getHosts()) {
LOGGER.info("Adding Host: {}", h);
InetSocketAddress socketAddress = new InetSocketAddress(h, config.getPort().intValue());
addresses.add(socketAddress);
}
builder.addContactPointsWithPorts(addresses);
if( StringUtils.isNotBlank(config.getUser()) &&
StringUtils.isNotBlank(config.getPassword())) {
builder.withCredentials(config.getUser(), config.getPassword());
}
cluster = builder.build();
Objects.nonNull(cluster);
try {
Metadata metadata = cluster.getMetadata();
LOGGER.info("Connected to cluster: {}\n",
metadata.getClusterName());
for ( Host host : metadata.getAllHosts() ) {
LOGGER.info("Datacenter: {}; Host: {}; Rack: {}\n",
host.getDatacenter(), host.getAddress(), host.getRack());
}
} catch( Exception e ) {
LOGGER.error("Exception: {}", e);
throw e;
}
try {
session = cluster.connect();
} catch( Exception e ) {
LOGGER.error("Exception: {}", e);
throw e;
}
Objects.nonNull(session);
}