in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java [286:318]
public void persistEvent(String channel, Event event) {
PersistableEvent persistableEvent = new PersistableEvent(channel, event);
JdbcTransactionImpl tx = null;
try {
tx = getTransaction();
tx.begin();
if (maxCapacity > 0) {
long currentSizeLong = currentSize.get();
if (currentSizeLong >= maxCapacity) {
throw new JdbcChannelException("Channel capacity reached: "
+ "maxCapacity: " + maxCapacity + ", currentSize: "
+ currentSizeLong);
}
}
// Persist the persistableEvent
schemaHandler.storeEvent(persistableEvent, tx.getConnection());
tx.incrementPersistedEventCount();
tx.commit();
} catch (Exception ex) {
tx.rollback();
throw new JdbcChannelException("Failed to persist event", ex);
} finally {
if (tx != null) {
tx.close();
}
}
LOGGER.debug("Persisted event: {}", persistableEvent.getEventId());
}