in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java [365:569]
private void initializeDataSource(Context context) {
driverClassName = getConfigurationString(context,
ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS,
ConfigurationConstants.OLD_CONFIG_JDBC_DRIVER_CLASS, null);
connectUrl = getConfigurationString(context,
ConfigurationConstants.CONFIG_URL,
ConfigurationConstants.OLD_CONFIG_URL, null);
String userName = getConfigurationString(context,
ConfigurationConstants.CONFIG_USERNAME,
ConfigurationConstants.OLD_CONFIG_USERNAME, null);
String password = getConfigurationString(context,
ConfigurationConstants.CONFIG_PASSWORD,
ConfigurationConstants.OLD_CONFIG_PASSWORD, null);
String jdbcPropertiesFile = getConfigurationString(context,
ConfigurationConstants.CONFIG_JDBC_PROPS_FILE,
ConfigurationConstants.OLD_CONFIG_JDBC_PROPS_FILE, null);
String dbTypeName = getConfigurationString(context,
ConfigurationConstants.CONFIG_DATABASE_TYPE,
ConfigurationConstants.OLD_CONFIG_DATABASE_TYPE, null);
// If connect URL is not specified, use embedded Derby
if (connectUrl == null || connectUrl.trim().length() == 0) {
LOGGER.warn("No connection URL specified. "
+ "Using embedded derby database instance.");
driverClassName = DEFAULT_DRIVER_CLASSNAME;
userName = DEFAULT_USERNAME;
password = DEFAULT_PASSWORD;
dbTypeName = DEFAULT_DBTYPE;
String homePath = System.getProperty("user.home").replace('\\', '/');
String defaultDbDir = homePath + "/.flume/jdbc-channel";
File dbDir = new File(defaultDbDir);
String canonicalDbDirPath = null;
try {
canonicalDbDirPath = dbDir.getCanonicalPath();
} catch (IOException ex) {
throw new JdbcChannelException("Unable to find canonical path of dir: "
+ defaultDbDir, ex);
}
if (!dbDir.exists()) {
if (!dbDir.mkdirs()) {
throw new JdbcChannelException("unable to create directory: "
+ canonicalDbDirPath);
}
}
connectUrl = "jdbc:derby:" + canonicalDbDirPath + "/db;create=true";
// No jdbc properties file will be used
jdbcPropertiesFile = null;
LOGGER.warn("Overriding values for - driver: " + driverClassName
+ ", user: " + userName + "connectUrl: " + connectUrl
+ ", jdbc properties file: " + jdbcPropertiesFile
+ ", dbtype: " + dbTypeName);
}
// Right now only Derby and MySQL supported
databaseType = DatabaseType.getByName(dbTypeName);
switch (databaseType) {
case DERBY:
case MYSQL:
break;
default:
throw new JdbcChannelException("Database " + databaseType
+ " not supported at this time");
}
// Register driver
if (driverClassName == null || driverClassName.trim().length() == 0) {
throw new JdbcChannelException("No jdbc driver specified");
}
try {
Class.forName(driverClassName);
} catch (ClassNotFoundException ex) {
throw new JdbcChannelException("Unable to load driver: "
+ driverClassName, ex);
}
// JDBC Properties
Properties jdbcProps = new Properties();
if (jdbcPropertiesFile != null && jdbcPropertiesFile.trim().length() > 0) {
File jdbcPropsFile = new File(jdbcPropertiesFile.trim());
if (!jdbcPropsFile.exists()) {
throw new JdbcChannelException("Jdbc properties file does not exist: "
+ jdbcPropertiesFile);
}
InputStream inStream = null;
try {
inStream = new FileInputStream(jdbcPropsFile);
jdbcProps.load(inStream);
} catch (IOException ex) {
throw new JdbcChannelException("Unable to load jdbc properties "
+ "from file: " + jdbcPropertiesFile, ex);
} finally {
if (inStream != null) {
try {
inStream.close();
} catch (IOException ex) {
LOGGER.error("Unable to close file: " + jdbcPropertiesFile, ex);
}
}
}
}
if (userName != null) {
Object oldUser = jdbcProps.put("user", userName);
if (oldUser != null) {
LOGGER.warn("Overriding user from: " + oldUser + " to: " + userName);
}
}
if (password != null) {
Object oldPass = jdbcProps.put("password", password);
if (oldPass != null) {
LOGGER.warn("Overriding password from the jdbc properties with "
+ " the one specified explicitly.");
}
}
if (LOGGER.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("JDBC Properties {");
boolean first = true;
Enumeration<?> propertyKeys = jdbcProps.propertyNames();
while (propertyKeys.hasMoreElements()) {
if (first) {
first = false;
} else {
sb.append(", ");
}
String key = (String) propertyKeys.nextElement();
sb.append(key).append("=");
if (key.equalsIgnoreCase("password")) {
sb.append("*******");
} else {
sb.append(jdbcProps.get(key));
}
}
sb.append("}");
LOGGER.debug(sb.toString());
}
// Transaction Isolation
String txIsolation = getConfigurationString(context,
ConfigurationConstants.CONFIG_TX_ISOLATION_LEVEL,
ConfigurationConstants.OLD_CONFIG_TX_ISOLATION_LEVEL,
TransactionIsolation.READ_COMMITTED.getName());
TransactionIsolation txIsolationLevel =
TransactionIsolation.getByName(txIsolation);
LOGGER.debug("Transaction isolation will be set to: " + txIsolationLevel);
// Setup Datasource
ConnectionFactory connFactory =
new DriverManagerConnectionFactory(connectUrl, jdbcProps);
connectionPool = new GenericObjectPool();
String maxActiveConnections = getConfigurationString(context,
ConfigurationConstants.CONFIG_MAX_CONNECTIONS,
ConfigurationConstants.OLD_CONFIG_MAX_CONNECTIONS, "10");
int maxActive = 10;
if (maxActiveConnections != null && maxActiveConnections.length() > 0) {
try {
maxActive = Integer.parseInt(maxActiveConnections);
} catch (NumberFormatException nfe) {
LOGGER.warn("Max active connections has invalid value: "
+ maxActiveConnections + ", Using default: " + maxActive);
}
}
LOGGER.debug("Max active connections for the pool: " + maxActive);
connectionPool.setMaxActive(maxActive);
statementPool = new GenericKeyedObjectPoolFactory(null);
// Creating the factory instance automatically wires the connection pool
new PoolableConnectionFactory(connFactory, connectionPool, statementPool,
databaseType.getValidationQuery(), false, false,
txIsolationLevel.getCode());
dataSource = new PoolingDataSource(connectionPool);
txFactory = new JdbcTransactionFactory(dataSource, this);
}