public void init()

in scylla/src/main/java/site/ycsb/db/scylla/ScyllaCQLClient.java [112:238]


  public void init() throws DBException {

    // Keep track of number of calls to init (for later cleanup)
    INIT_COUNT.incrementAndGet();

    // Synchronized so that we only have a single
    // cluster/session instance for all the threads.
    synchronized (INIT_COUNT) {

      // Check if the cluster has already been initialized
      if (cluster != null) {
        return;
      }

      try {

        debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
        trace = Boolean.parseBoolean(getProperties().getProperty(TRACING_PROPERTY, TRACING_PROPERTY_DEFAULT));

        String host = getProperties().getProperty(HOSTS_PROPERTY);
        if (host == null) {
          throw new DBException(String.format("Required property \"%s\" missing for scyllaCQLClient", HOSTS_PROPERTY));
        }
        String[] hosts = host.split(",");
        String port = getProperties().getProperty(PORT_PROPERTY, PORT_PROPERTY_DEFAULT);

        String username = getProperties().getProperty(USERNAME_PROPERTY);
        String password = getProperties().getProperty(PASSWORD_PROPERTY);

        String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, KEYSPACE_PROPERTY_DEFAULT);

        readConsistencyLevel = ConsistencyLevel.valueOf(
            getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
        writeConsistencyLevel = ConsistencyLevel.valueOf(
            getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));

        boolean useSSL = Boolean.parseBoolean(
            getProperties().getProperty(USE_SSL_CONNECTION, DEFAULT_USE_SSL_CONNECTION));

        Cluster.Builder builder;
        if ((username != null) && !username.isEmpty()) {
          builder = Cluster.builder().withCredentials(username, password)
              .addContactPoints(hosts).withPort(Integer.parseInt(port));
          if (useSSL) {
            builder = builder.withSSL();
          }
        } else {
          builder = Cluster.builder().withPort(Integer.parseInt(port))
              .addContactPoints(hosts);
        }

        final String localDC = getProperties().getProperty(TOKEN_AWARE_LOCAL_DC);
        if (localDC != null && !localDC.isEmpty()) {
          final LoadBalancingPolicy local = DCAwareRoundRobinPolicy.builder().withLocalDc(localDC).build();
          final TokenAwarePolicy tokenAware = new TokenAwarePolicy(local);
          builder = builder.withLoadBalancingPolicy(tokenAware);

          LOGGER.info("Using local datacenter with token awareness: {}\n", localDC);

          // If was not overridden explicitly, set LOCAL_QUORUM
          if (getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY) == null) {
            readConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
          }

          if (getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY) == null) {
            writeConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
          }
        }

        cluster = builder.build();

        String maxConnections = getProperties().getProperty(
            MAX_CONNECTIONS_PROPERTY);
        if (maxConnections != null) {
          cluster.getConfiguration().getPoolingOptions()
              .setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(maxConnections));
        }

        String coreConnections = getProperties().getProperty(
            CORE_CONNECTIONS_PROPERTY);
        if (coreConnections != null) {
          cluster.getConfiguration().getPoolingOptions()
              .setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(coreConnections));
        }

        String connectTimeoutMillis = getProperties().getProperty(
            CONNECT_TIMEOUT_MILLIS_PROPERTY);
        if (connectTimeoutMillis != null) {
          cluster.getConfiguration().getSocketOptions()
              .setConnectTimeoutMillis(Integer.parseInt(connectTimeoutMillis));
        }

        String readTimeoutMillis = getProperties().getProperty(
            READ_TIMEOUT_MILLIS_PROPERTY);
        if (readTimeoutMillis != null) {
          cluster.getConfiguration().getSocketOptions()
              .setReadTimeoutMillis(Integer.parseInt(readTimeoutMillis));
        }

        Metadata metadata = cluster.getMetadata();
        LOGGER.info("Connected to cluster: {}\n", metadata.getClusterName());

        for (Host discoveredHost : metadata.getAllHosts()) {
          LOGGER.info("Datacenter: {}; Host: {}; Rack: {}\n",
              discoveredHost.getDatacenter(), discoveredHost.getEndPoint().resolve().getAddress(),
              discoveredHost.getRack());
        }

        session = cluster.connect(keyspace);

        if (Boolean.parseBoolean(getProperties().getProperty(SCYLLA_LWT, Boolean.toString(lwt)))) {
          LOGGER.info("Using LWT\n");
          lwt = true;
          readConsistencyLevel = ConsistencyLevel.SERIAL;
          writeConsistencyLevel = ConsistencyLevel.ANY;
        } else {
          LOGGER.info("Not using LWT\n");
        }

        LOGGER.info("Read consistency: {}, Write consistency: {}\n",
            readConsistencyLevel.name(),
            writeConsistencyLevel.name());
      } catch (Exception e) {
        throw new DBException(e);
      }
    } // synchronized
  }