in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java [189:234]
private void initializeSchema(Context context) {
String createSchemaFlag = getConfigurationString(context,
ConfigurationConstants.CONFIG_CREATE_SCHEMA,
ConfigurationConstants.OLD_CONFIG_CREATE_SCHEMA, "true");
boolean createSchema = Boolean.valueOf(createSchemaFlag);
LOGGER.debug("Create schema flag set to: " + createSchema);
// First check if the schema exists
schemaHandler = SchemaHandlerFactory.getHandler(databaseType, dataSource);
if (!schemaHandler.schemaExists()) {
if (!createSchema) {
throw new JdbcChannelException("Schema does not exist and "
+ "auto-generation is disabled. Please enable auto-generation of "
+ "schema and try again.");
}
String createIndexFlag = getConfigurationString(context,
ConfigurationConstants.CONFIG_CREATE_INDEX,
ConfigurationConstants.OLD_CONFIG_CREATE_INDEX, "true");
String createForeignKeysFlag = getConfigurationString(context,
ConfigurationConstants.CONFIG_CREATE_FK,
ConfigurationConstants.OLD_CONFIG_CREATE_FK, "true");
boolean createIndex = Boolean.valueOf(createIndexFlag);
if (!createIndex) {
LOGGER.warn("Index creation is disabled, indexes will not be created.");
}
boolean createForeignKeys = Boolean.valueOf(createForeignKeysFlag);
if (createForeignKeys) {
LOGGER.info("Foreign Key Constraints will be enabled.");
} else {
LOGGER.info("Foreign Key Constratins will be disabled.");
}
// Now create schema
schemaHandler.createSchemaObjects(createForeignKeys, createIndex);
}
// Validate all schema objects are as expected
schemaHandler.validateSchema();
}