in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java [107:174]
public void close() {
if (!active) {
throw new JdbcChannelException("Inactive transaction");
}
count--;
LOGGER.debug("Tx count-close: " + count + ", rollback: " + rollback);
if (count == 0) {
active = false;
try {
if (rollback) {
LOGGER.info("Attempting transaction roll-back");
connection.rollback();
} else {
LOGGER.debug("Attempting transaction commit");
connection.commit();
// Commit successful. Update provider channel size
providerImpl.updateCurrentChannelSize(this.persistedEventCount
- this.removedEventCount);
this.persistedEventCount = 0;
this.removedEventCount = 0;
}
} catch (SQLException ex) {
throw new JdbcChannelException("Unable to finalize transaction", ex);
} finally {
if (connection != null) {
// Log Warnings
try {
SQLWarning warning = connection.getWarnings();
if (warning != null) {
StringBuilder sb = new StringBuilder("Connection warnigns: ");
boolean first = true;
while (warning != null) {
if (first) {
first = false;
} else {
sb.append("; ");
}
sb.append("[").append(warning.getErrorCode()).append("] ");
sb.append(warning.getMessage());
}
LOGGER.warn(sb.toString());
}
} catch (SQLException ex) {
LOGGER.error("Error while retrieving warnigns: "
+ ex.getErrorCode(), ex);
}
// Close Connection
try {
connection.close();
} catch (SQLException ex) {
LOGGER.error(
"Unable to close connection: " + ex.getErrorCode(), ex);
}
}
// Clean up thread local
txFactory.remove();
// Destroy local state
connection = null;
txFactory = null;
}
}
}