private void initializeDataSource()

in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java [365:569]


  private void initializeDataSource(Context context) {
    driverClassName = getConfigurationString(context,
        ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS,
        ConfigurationConstants.OLD_CONFIG_JDBC_DRIVER_CLASS, null);

    connectUrl = getConfigurationString(context,
        ConfigurationConstants.CONFIG_URL,
        ConfigurationConstants.OLD_CONFIG_URL, null);


    String userName = getConfigurationString(context,
        ConfigurationConstants.CONFIG_USERNAME,
        ConfigurationConstants.OLD_CONFIG_USERNAME, null);

    String password = getConfigurationString(context,
        ConfigurationConstants.CONFIG_PASSWORD,
        ConfigurationConstants.OLD_CONFIG_PASSWORD, null);

    String jdbcPropertiesFile = getConfigurationString(context,
        ConfigurationConstants.CONFIG_JDBC_PROPS_FILE,
        ConfigurationConstants.OLD_CONFIG_JDBC_PROPS_FILE, null);

    String dbTypeName = getConfigurationString(context,
        ConfigurationConstants.CONFIG_DATABASE_TYPE,
        ConfigurationConstants.OLD_CONFIG_DATABASE_TYPE, null);

    // If connect URL is not specified, use embedded Derby
    if (connectUrl == null || connectUrl.trim().length() == 0) {
      LOGGER.warn("No connection URL specified. "
          + "Using embedded derby database instance.");

      driverClassName = DEFAULT_DRIVER_CLASSNAME;
      userName = DEFAULT_USERNAME;
      password = DEFAULT_PASSWORD;
      dbTypeName = DEFAULT_DBTYPE;

      String homePath = System.getProperty("user.home").replace('\\', '/');

      String defaultDbDir = homePath + "/.flume/jdbc-channel";


      File dbDir = new File(defaultDbDir);
      String canonicalDbDirPath = null;

      try {
        canonicalDbDirPath = dbDir.getCanonicalPath();
      } catch (IOException ex) {
        throw new JdbcChannelException("Unable to find canonical path of dir: "
            + defaultDbDir, ex);
      }

      if (!dbDir.exists()) {
        if (!dbDir.mkdirs()) {
          throw new JdbcChannelException("unable to create directory: "
              + canonicalDbDirPath);
        }
      }

      connectUrl = "jdbc:derby:" + canonicalDbDirPath + "/db;create=true";

      // No jdbc properties file will be used
      jdbcPropertiesFile = null;

      LOGGER.warn("Overriding values for - driver: " + driverClassName
          + ", user: " + userName + "connectUrl: " + connectUrl
          + ", jdbc properties file: " + jdbcPropertiesFile
          + ", dbtype: " + dbTypeName);
    }

    // Right now only Derby and MySQL supported
    databaseType = DatabaseType.getByName(dbTypeName);

    switch (databaseType) {
      case DERBY:
      case MYSQL:
        break;
      default:
        throw new JdbcChannelException("Database " + databaseType
            + " not supported at this time");
    }

    // Register driver
    if (driverClassName == null || driverClassName.trim().length() == 0) {
      throw new JdbcChannelException("No jdbc driver specified");
    }

    try {
      Class.forName(driverClassName);
    } catch (ClassNotFoundException ex) {
      throw new JdbcChannelException("Unable to load driver: "
                  + driverClassName, ex);
    }

    // JDBC Properties
    Properties jdbcProps = new Properties();

    if (jdbcPropertiesFile != null && jdbcPropertiesFile.trim().length() > 0) {
      File jdbcPropsFile = new File(jdbcPropertiesFile.trim());
      if (!jdbcPropsFile.exists()) {
        throw new JdbcChannelException("Jdbc properties file does not exist: "
            + jdbcPropertiesFile);
      }

      InputStream inStream = null;
      try {
        inStream = new FileInputStream(jdbcPropsFile);
        jdbcProps.load(inStream);
      } catch (IOException ex) {
        throw new JdbcChannelException("Unable to load jdbc properties "
            + "from file: " + jdbcPropertiesFile, ex);
      } finally {
        if (inStream != null) {
          try {
            inStream.close();
          } catch (IOException ex) {
            LOGGER.error("Unable to close file: " + jdbcPropertiesFile, ex);
          }
        }
      }
    }

    if (userName != null) {
      Object oldUser = jdbcProps.put("user", userName);
      if (oldUser != null) {
        LOGGER.warn("Overriding user from: " + oldUser + " to: " + userName);
      }
    }

    if (password != null) {
      Object oldPass = jdbcProps.put("password", password);
      if (oldPass != null) {
        LOGGER.warn("Overriding password from the jdbc properties with "
            + " the one specified explicitly.");
      }
    }

    if (LOGGER.isDebugEnabled()) {
      StringBuilder sb = new StringBuilder("JDBC Properties {");
      boolean first = true;
      Enumeration<?> propertyKeys = jdbcProps.propertyNames();
      while (propertyKeys.hasMoreElements()) {
        if (first) {
          first = false;
        } else {
          sb.append(", ");
        }
        String key = (String) propertyKeys.nextElement();
        sb.append(key).append("=");
        if (key.equalsIgnoreCase("password")) {
          sb.append("*******");
        } else {
          sb.append(jdbcProps.get(key));
        }
      }

      sb.append("}");

      LOGGER.debug(sb.toString());
    }

    // Transaction Isolation
    String txIsolation = getConfigurationString(context,
        ConfigurationConstants.CONFIG_TX_ISOLATION_LEVEL,
        ConfigurationConstants.OLD_CONFIG_TX_ISOLATION_LEVEL,
        TransactionIsolation.READ_COMMITTED.getName());

    TransactionIsolation txIsolationLevel =
        TransactionIsolation.getByName(txIsolation);

    LOGGER.debug("Transaction isolation will be set to: " + txIsolationLevel);

    // Setup Datasource
    ConnectionFactory connFactory =
        new DriverManagerConnectionFactory(connectUrl, jdbcProps);

    connectionPool = new GenericObjectPool();

    String maxActiveConnections = getConfigurationString(context,
        ConfigurationConstants.CONFIG_MAX_CONNECTIONS,
        ConfigurationConstants.OLD_CONFIG_MAX_CONNECTIONS, "10");

    int maxActive = 10;
    if (maxActiveConnections != null && maxActiveConnections.length() > 0) {
      try {
        maxActive = Integer.parseInt(maxActiveConnections);
      } catch (NumberFormatException nfe) {
        LOGGER.warn("Max active connections has invalid value: "
                + maxActiveConnections + ", Using default: " + maxActive);
      }
    }

    LOGGER.debug("Max active connections for the pool: " + maxActive);
    connectionPool.setMaxActive(maxActive);

    statementPool = new GenericKeyedObjectPoolFactory(null);

    // Creating the factory instance automatically wires the connection pool
    new PoolableConnectionFactory(connFactory, connectionPool, statementPool,
        databaseType.getValidationQuery(), false, false,
        txIsolationLevel.getCode());

    dataSource = new PoolingDataSource(connectionPool);

    txFactory = new JdbcTransactionFactory(dataSource, this);
  }