in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java [137:187]
private void initializeChannelState(Context context) {
String maxCapacityStr = getConfigurationString(context,
ConfigurationConstants.CONFIG_MAX_CAPACITY,
ConfigurationConstants.OLD_CONFIG_MAX_CAPACITY, "0");
long maxCapacitySpecified = 0;
try {
maxCapacitySpecified = Long.parseLong(maxCapacityStr);
} catch (NumberFormatException nfe) {
LOGGER.warn("Invalid value specified for maximum channel capacity: "
+ maxCapacityStr, nfe);
}
if (maxCapacitySpecified > 0) {
this.maxCapacity = maxCapacitySpecified;
LOGGER.info("Maximum channel capacity: {}", maxCapacity);
} else {
LOGGER.warn("JDBC channel will operate without a capacity limit.");
}
if (maxCapacity > 0) {
// Initialize current size
JdbcTransactionImpl tx = null;
try {
tx = getTransaction();
tx.begin();
Connection conn = tx.getConnection();
currentSize.set(schemaHandler.getChannelSize(conn));
tx.commit();
} catch (Exception ex) {
tx.rollback();
throw new JdbcChannelException("Failed to initialize current size", ex);
} finally {
if (tx != null) {
tx.close();
}
}
long currentSizeLong = currentSize.get();
if (currentSizeLong > maxCapacity) {
LOGGER.warn("The current size of channel (" + currentSizeLong
+ ") is more than the specified maximum capacity (" + maxCapacity
+ "). If this situation persists, it may require resizing and "
+ "replanning of your deployment.");
}
LOGGER.info("Current channel size: {}", currentSizeLong);
}
}