public void start()

in streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraClient.java [61:142]


  public void start() throws Exception {

    Objects.nonNull(config);

    LOGGER.info("CassandraClient.start {}", config);

    Cluster.Builder builder = Cluster.builder()
        .withPort(config.getPort().intValue())
        .withoutJMXReporting()
        .withoutMetrics()
        .withSocketOptions(
            new SocketOptions()
                .setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS*10)
                .setReadTimeoutMillis(DEFAULT_READ_TIMEOUT_MILLIS*10)
        );

    if( config.getSsl() != null && config.getSsl().getEnabled() == true) {

      Ssl ssl = config.getSsl();

      KeyStore ks = KeyStore.getInstance("JKS");

      InputStream trustStore = new FileInputStream(ssl.getTrustStore());
      ks.load(trustStore, ssl.getTrustStorePassword().toCharArray());
      InputStream keyStore = new FileInputStream(ssl.getKeyStore());
      ks.load(keyStore, ssl.getKeyStorePassword().toCharArray());

      TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
      tmf.init(ks);

      KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
      kmf.init(ks, ssl.getKeyStorePassword().toCharArray());

      SSLContext sslContext = SSLContext.getInstance("SSLv3");
      sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

      SSLOptions sslOptions = JdkSSLOptions.builder()
          .withSSLContext(sslContext)
          .build();

      builder = builder.withSSL(sslOptions);
    }

    Collection<InetSocketAddress> addresses = new ArrayList<>();
    for (String h : config.getHosts()) {
      LOGGER.info("Adding Host: {}", h);
      InetSocketAddress socketAddress = new InetSocketAddress(h, config.getPort().intValue());
      addresses.add(socketAddress);
    }
    builder.addContactPointsWithPorts(addresses);

    if( StringUtils.isNotBlank(config.getUser()) &&
        StringUtils.isNotBlank(config.getPassword())) {
      builder.withCredentials(config.getUser(), config.getPassword());
    }
    cluster = builder.build();

    Objects.nonNull(cluster);

    try {
      Metadata metadata = cluster.getMetadata();
      LOGGER.info("Connected to cluster: {}\n",
          metadata.getClusterName());
      for ( Host host : metadata.getAllHosts() ) {
        LOGGER.info("Datacenter: {}; Host: {}; Rack: {}\n",
            host.getDatacenter(), host.getAddress(), host.getRack());
      }
    } catch( Exception e ) {
      LOGGER.error("Exception: {}", e);
      throw e;
    }

    try {
      session = cluster.connect();
    } catch( Exception e ) {
      LOGGER.error("Exception: {}", e);
      throw e;
    }

    Objects.nonNull(session);

  }