in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java [458:529]
private void verifyTableStructure(String schemaName, String tableName,
String... columns) {
Set<String> columnNames = new HashSet<String>();
Connection connection = null;
PreparedStatement pStmt = null;
try {
connection = dataSource.getConnection();
pStmt = connection.prepareStatement(COLUMN_LOOKUP_QUERY);
pStmt.setString(1, tableName);
pStmt.setString(2, schemaName);
ResultSet rset = pStmt.executeQuery();
while (rset.next()) {
columnNames.add(rset.getString(1));
}
connection.commit();
} catch (SQLException ex) {
try {
connection.rollback();
} catch (SQLException ex2) {
LOGGER.error("Unable to rollback transaction", ex2);
}
throw new JdbcChannelException("Unable to run query: "
+ COLUMN_LOOKUP_QUERY + ": 1=" + tableName + ", 2=" + schemaName, ex);
} finally {
if (pStmt != null) {
try {
pStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close statement", ex);
}
if (connection != null) {
try {
connection.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close connection", ex);
}
}
}
}
Set<String> columnDiff = new HashSet<String>();
columnDiff.addAll(columnNames);
// Expected Column string form
StringBuilder sb = new StringBuilder("{");
boolean first = true;
for (String column : columns) {
columnDiff.remove(column);
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(column);
}
sb.append("}");
String expectedColumns = sb.toString();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Table " + schemaName + "." + tableName
+ " expected columns: " + expectedColumns + ", actual columns: "
+ columnNames);
}
if (columnNames.size() != columns.length || columnDiff.size() != 0) {
throw new JdbcChannelException("Expected table " + schemaName + "."
+ tableName + " to have columns: " + expectedColumns + ". Instead "
+ "found columns: " + columnNames);
}
}