private DriverConfigLoader createLoader()

in cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java [258:362]


  private DriverConfigLoader createLoader() {
    LOGGER.debug("Creating programmatic config loader");
    // start generation of the config
    ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder();

    Map<DriverOption, String> allOptions = new HashMap<>();

    // set options from main configuration
    String ts = getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS,
            CassandraInterpreter.DEFAULT_CONNECTION_TIMEOUT) + MILLISECONDS_STR;
    allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts);
    allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, ts);
    allOptions.put(DefaultDriverOption.REQUEST_TIMEOUT,
            getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS,
                    CassandraInterpreter.DEFAULT_READ_TIMEOUT) + MILLISECONDS_STR);
    addIfNotBlank(allOptions,
            getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, CassandraInterpreter.DEFAULT_TCP_NO_DELAY),
            DefaultDriverOption.SOCKET_TCP_NODELAY);
    addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_KEEP_ALIVE),
            DefaultDriverOption.SOCKET_KEEP_ALIVE);
    addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES),
            DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE);
    addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES),
            DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE);
    addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS),
            DefaultDriverOption.SOCKET_REUSE_ADDRESS);
    addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_SO_LINGER),
            DefaultDriverOption.SOCKET_LINGER_INTERVAL);
    addIfNotBlank(allOptions,
            getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE),
            DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE);
    allOptions.put(DefaultDriverOption.REQUEST_CONSISTENCY,
            getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY,
                    CassandraInterpreter.DEFAULT_CONSISTENCY));
    allOptions.put(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY,
            getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY,
                    CassandraInterpreter.DEFAULT_SERIAL_CONSISTENCY));
    allOptions.put(DefaultDriverOption.REQUEST_PAGE_SIZE,
            getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE,
                    CassandraInterpreter.DEFAULT_FETCH_SIZE));
    ts = getProperty(CASSANDRA_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION);
    if (!DEFAULT_VALUE.equalsIgnoreCase(ts)) {
      // for compatibility with previous configurations
      if (ts.equals("4") || ts.equals("3")) {
        ts = "V" + ts;
      }
      allOptions.put(DefaultDriverOption.PROTOCOL_VERSION, ts);
    }
    addIfNotBlank(allOptions, getProperty(CASSANDRA_COMPRESSION_PROTOCOL,
            CassandraInterpreter.DEFAULT_COMPRESSION).toLowerCase(),
            DefaultDriverOption.PROTOCOL_COMPRESSION);
    addIfNotBlankOrDefault(allOptions, getProperty(CASSANDRA_RETRY_POLICY, DEFAULT_POLICY),
            DefaultDriverOption.RETRY_POLICY_CLASS);
    addIfNotBlankOrDefault(allOptions,
            getProperty(CASSANDRA_RECONNECTION_POLICY, DEFAULT_POLICY),
            DefaultDriverOption.RECONNECTION_POLICY_CLASS);
    addIfNotBlankOrDefault(allOptions,
            getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, DEFAULT_POLICY),
            DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS);
    allOptions.put(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE,
            getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL,
                    DEFAULT_CONNECTIONS_PER_HOST));
    allOptions.put(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE,
            getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE,
                    DEFAULT_CONNECTIONS_PER_HOST));
    allOptions.put(DefaultDriverOption.CONNECTION_MAX_REQUESTS,
            getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION,
            DEFAULT_MAX_REQUEST_PER_CONNECTION));
    allOptions.put(DefaultDriverOption.HEARTBEAT_INTERVAL,
            getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS,
                    DEFAULT_HEARTBEAT_INTERVAL) + SECONDS_STR);
    ts = getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS,
            DEFAULT_POOL_TIMEOUT) + MILLISECONDS_STR;
    allOptions.put(DefaultDriverOption.HEARTBEAT_TIMEOUT, ts);
    allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts);
    allOptions.put(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
            "DcInferringLoadBalancingPolicy");
    allOptions.put(DefaultDriverOption.RESOLVE_CONTACT_POINTS, "false");
    allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT,
            getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
                    DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS) + SECONDS_STR);

    // extract additional options that may override values set by main configuration
    for (String pname: properties.stringPropertyNames()) {
      if (pname.startsWith(DATASTAX_JAVA_DRIVER_PREFIX)) {
        String pvalue = properties.getProperty(pname);
        LOGGER.info("Custom config values: {} = {}", pname, pvalue);
        String shortName = pname.substring(DATASTAX_JAVA_DRIVER_PREFIX.length());
        if (optionMap.containsKey(shortName)) {
          allOptions.put(optionMap.get(shortName), pvalue);
        } else {
          LOGGER.warn("Incorrect option name: {}", pname);
        }
      }
    }

    for (Map.Entry<DriverOption, String> entry: allOptions.entrySet()) {
      configBuilder.withString(entry.getKey(), entry.getValue());
    }

    DriverConfigLoader loader = configBuilder.endProfile().build();
    LOGGER.debug("Config loader is created");

    return loader;
  }