in scylla/src/main/java/site/ycsb/db/scylla/ScyllaCQLClient.java [112:238]
public void init() throws DBException {
// Keep track of number of calls to init (for later cleanup)
INIT_COUNT.incrementAndGet();
// Synchronized so that we only have a single
// cluster/session instance for all the threads.
synchronized (INIT_COUNT) {
// Check if the cluster has already been initialized
if (cluster != null) {
return;
}
try {
debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
trace = Boolean.parseBoolean(getProperties().getProperty(TRACING_PROPERTY, TRACING_PROPERTY_DEFAULT));
String host = getProperties().getProperty(HOSTS_PROPERTY);
if (host == null) {
throw new DBException(String.format("Required property \"%s\" missing for scyllaCQLClient", HOSTS_PROPERTY));
}
String[] hosts = host.split(",");
String port = getProperties().getProperty(PORT_PROPERTY, PORT_PROPERTY_DEFAULT);
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, KEYSPACE_PROPERTY_DEFAULT);
readConsistencyLevel = ConsistencyLevel.valueOf(
getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
writeConsistencyLevel = ConsistencyLevel.valueOf(
getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
boolean useSSL = Boolean.parseBoolean(
getProperties().getProperty(USE_SSL_CONNECTION, DEFAULT_USE_SSL_CONNECTION));
Cluster.Builder builder;
if ((username != null) && !username.isEmpty()) {
builder = Cluster.builder().withCredentials(username, password)
.addContactPoints(hosts).withPort(Integer.parseInt(port));
if (useSSL) {
builder = builder.withSSL();
}
} else {
builder = Cluster.builder().withPort(Integer.parseInt(port))
.addContactPoints(hosts);
}
final String localDC = getProperties().getProperty(TOKEN_AWARE_LOCAL_DC);
if (localDC != null && !localDC.isEmpty()) {
final LoadBalancingPolicy local = DCAwareRoundRobinPolicy.builder().withLocalDc(localDC).build();
final TokenAwarePolicy tokenAware = new TokenAwarePolicy(local);
builder = builder.withLoadBalancingPolicy(tokenAware);
LOGGER.info("Using local datacenter with token awareness: {}\n", localDC);
// If was not overridden explicitly, set LOCAL_QUORUM
if (getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY) == null) {
readConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
}
if (getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY) == null) {
writeConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
}
}
cluster = builder.build();
String maxConnections = getProperties().getProperty(
MAX_CONNECTIONS_PROPERTY);
if (maxConnections != null) {
cluster.getConfiguration().getPoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(maxConnections));
}
String coreConnections = getProperties().getProperty(
CORE_CONNECTIONS_PROPERTY);
if (coreConnections != null) {
cluster.getConfiguration().getPoolingOptions()
.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(coreConnections));
}
String connectTimeoutMillis = getProperties().getProperty(
CONNECT_TIMEOUT_MILLIS_PROPERTY);
if (connectTimeoutMillis != null) {
cluster.getConfiguration().getSocketOptions()
.setConnectTimeoutMillis(Integer.parseInt(connectTimeoutMillis));
}
String readTimeoutMillis = getProperties().getProperty(
READ_TIMEOUT_MILLIS_PROPERTY);
if (readTimeoutMillis != null) {
cluster.getConfiguration().getSocketOptions()
.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillis));
}
Metadata metadata = cluster.getMetadata();
LOGGER.info("Connected to cluster: {}\n", metadata.getClusterName());
for (Host discoveredHost : metadata.getAllHosts()) {
LOGGER.info("Datacenter: {}; Host: {}; Rack: {}\n",
discoveredHost.getDatacenter(), discoveredHost.getEndPoint().resolve().getAddress(),
discoveredHost.getRack());
}
session = cluster.connect(keyspace);
if (Boolean.parseBoolean(getProperties().getProperty(SCYLLA_LWT, Boolean.toString(lwt)))) {
LOGGER.info("Using LWT\n");
lwt = true;
readConsistencyLevel = ConsistencyLevel.SERIAL;
writeConsistencyLevel = ConsistencyLevel.ANY;
} else {
LOGGER.info("Not using LWT\n");
}
LOGGER.info("Read consistency: {}, Write consistency: {}\n",
readConsistencyLevel.name(),
writeConsistencyLevel.name());
} catch (Exception e) {
throw new DBException(e);
}
} // synchronized
}