private void verifyTableStructure()

in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java [458:529]


  private void verifyTableStructure(String schemaName, String tableName,
      String... columns) {
    Set<String> columnNames = new HashSet<String>();
    Connection connection = null;
    PreparedStatement pStmt = null;
    try {
      connection = dataSource.getConnection();
      pStmt = connection.prepareStatement(COLUMN_LOOKUP_QUERY);
      pStmt.setString(1, tableName);
      pStmt.setString(2, schemaName);
      ResultSet rset = pStmt.executeQuery();

      while (rset.next()) {
        columnNames.add(rset.getString(1));
      }
      connection.commit();
    } catch (SQLException ex) {
      try {
        connection.rollback();
      } catch (SQLException ex2) {
        LOGGER.error("Unable to rollback transaction", ex2);
      }
      throw new JdbcChannelException("Unable to run query: "
          + COLUMN_LOOKUP_QUERY + ": 1=" + tableName + ", 2=" + schemaName, ex);
    } finally {
      if (pStmt != null) {
        try {
          pStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close statement", ex);
        }
        if (connection != null) {
          try {
            connection.close();
          } catch (SQLException ex) {
            LOGGER.error("Unable to close connection", ex);
          }
        }
      }
    }

    Set<String> columnDiff = new HashSet<String>();
    columnDiff.addAll(columnNames);

    // Expected Column string form
    StringBuilder sb = new StringBuilder("{");
    boolean first = true;
    for (String column : columns) {
      columnDiff.remove(column);
      if (first) {
        first = false;
      } else {
        sb.append(", ");
      }
      sb.append(column);
    }
    sb.append("}");

    String expectedColumns = sb.toString();

    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Table " + schemaName + "." + tableName
          + " expected columns: " + expectedColumns + ", actual columns: "
          + columnNames);
    }

    if (columnNames.size() != columns.length || columnDiff.size() != 0) {
      throw new JdbcChannelException("Expected table " + schemaName + "."
          + tableName + " to have columns: " + expectedColumns + ". Instead "
          + "found columns: " + columnNames);
    }
  }