private void initializeChannelState()

in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java [137:187]


  private void initializeChannelState(Context context) {

    String maxCapacityStr = getConfigurationString(context,
        ConfigurationConstants.CONFIG_MAX_CAPACITY,
        ConfigurationConstants.OLD_CONFIG_MAX_CAPACITY, "0");

    long maxCapacitySpecified = 0;
    try {
      maxCapacitySpecified = Long.parseLong(maxCapacityStr);
    } catch (NumberFormatException nfe) {
      LOGGER.warn("Invalid value specified for maximum channel capacity: "
          + maxCapacityStr, nfe);
    }

    if (maxCapacitySpecified > 0) {
      this.maxCapacity = maxCapacitySpecified;
      LOGGER.info("Maximum channel capacity: {}", maxCapacity);
    } else {
      LOGGER.warn("JDBC channel will operate without a capacity limit.");
    }

    if (maxCapacity > 0) {
      // Initialize current size
      JdbcTransactionImpl tx = null;
      try {
        tx = getTransaction();
        tx.begin();
        Connection conn = tx.getConnection();

        currentSize.set(schemaHandler.getChannelSize(conn));
        tx.commit();
      } catch (Exception ex) {
        tx.rollback();
        throw new JdbcChannelException("Failed to initialize current size", ex);
      } finally {
        if (tx != null) {
          tx.close();
        }
      }

      long currentSizeLong = currentSize.get();

      if (currentSizeLong > maxCapacity) {
        LOGGER.warn("The current size of channel (" + currentSizeLong
            + ") is more than the specified maximum capacity (" + maxCapacity
            + "). If this situation persists, it may require resizing and "
            + "replanning of your deployment.");
      }
      LOGGER.info("Current channel size: {}", currentSizeLong);
    }
  }