public static int createExportChangesTable()

in odps-sqoop/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java [1297:1406]


  public static int createExportChangesTable(Connection connection,
      OracleTable tableToCreate, String tableToCreateStorageClause,
      OracleTable tableContainingUpdates, OracleTable tableToBeUpdated,
      String[] joinColumnNames, CreateExportChangesTableOptions options,
      boolean parallelizationEnabled) throws SQLException {

    List<String> columnNames =
        getTableColumnNames(connection, tableToBeUpdated
            , false // <- omitLobAndLongColumnsDuringImport
            , OraOopConstants.Sqoop.Tool.EXPORT
            , true // <- onlyOraOopSupportedTypes
            , false // <- omitOraOopPseudoColumns
        );

    StringBuilder columnClause = new StringBuilder(2 * columnNames.size());
    for (int idx = 0; idx < columnNames.size(); idx++) {
      if (idx > 0) {
        columnClause.append(",");
      }
      columnClause.append("a." + columnNames.get(idx));
    }

    StringBuilder rowEqualityClause = new StringBuilder();
    for (int idx = 0; idx < columnNames.size(); idx++) {
      String columnName = columnNames.get(idx);

      // We need to omit the OraOop pseudo columns from the SQL statement that
      // compares the data in
      // the two tables we're interested in. Otherwise, EVERY row will be
      // considered to be changed,
      // since the values in the pseudo columns will differ. (i.e.
      // ORAOOP_EXPORT_SYSDATE will differ.)
      if (columnName
          .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_PARTITION)
          || columnName
             .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION)
          || columnName
             .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_MAPPER_ROW)) {
        continue;
      }

      if (idx > 0) {
        rowEqualityClause.append("OR");
      }

      rowEqualityClause.append(String.format("(a.%1$s <> b.%1$s "
          + "OR (a.%1$s IS NULL AND b.%1$s IS NOT NULL) "
          + "OR (a.%1$s IS NOT NULL AND b.%1$s IS NULL))", columnName));
    }

    String sqlJoin = null;
    switch (options) {

      case OnlyRowsThatDiffer:
        sqlJoin = "";
        break;

      case RowsThatDifferPlusNewRows:
        sqlJoin = "(+)"; // <- An outer-join will cause the "new" rows to be
                         // included
        break;

      default:
        throw new RuntimeException(String.format(
            "Update %s to cater for the option \"%s\".", OraOopUtilities
                .getCurrentMethodName(), options.toString()));
    }

    String sql =
        String.format("CREATE TABLE %1$s \n" + "NOLOGGING %8$s \n" + "%7$s \n"
            + "AS \n " + "SELECT \n" + "%5$s \n" + "FROM %2$s a, %3$s b \n"
            + "WHERE (%4$s) \n" + "AND ( \n" + "%6$s \n" + ")", tableToCreate
            .toString(), tableContainingUpdates.toString(), tableToBeUpdated
            .toString(), generateUpdateKeyColumnsWhereClauseFragment(
            joinColumnNames, "a", "b", sqlJoin), columnClause.toString(),
            rowEqualityClause.toString(), parallelizationEnabled ? "PARALLEL"
                : "", tableToCreateStorageClause);

    LOG.info(String.format("The SQL to create the changes-table is:\n%s", sql));

    Statement statement = connection.createStatement();

    long start = System.nanoTime();
    statement.executeUpdate(sql);
    double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
    LOG.info(String.format("Time spent creating change-table: %f sec.",
        timeInSec));

    String indexName = tableToCreate.toString().replaceAll("CHG", "IDX");
    start = System.nanoTime();
    statement.execute(String.format("CREATE INDEX %s ON %s (%s)", indexName,
        tableToCreate.toString(), OraOopUtilities
            .stringArrayToCSV(joinColumnNames)));
    timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
    LOG.info(String.format("Time spent creating change-table index: %f sec.",
        timeInSec));

    int changeTableRowCount = 0;

    ResultSet resultSet =
        statement.executeQuery(String.format("select count(*) from %s",
            tableToCreate.toString()));
    resultSet.next();
    changeTableRowCount = resultSet.getInt(1);
    LOG.info(String.format("The number of rows in the change-table is: %d",
        changeTableRowCount));

    statement.close();
    return changeTableRowCount;
  }