in odps-sqoop/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java [1297:1406]
public static int createExportChangesTable(Connection connection,
OracleTable tableToCreate, String tableToCreateStorageClause,
OracleTable tableContainingUpdates, OracleTable tableToBeUpdated,
String[] joinColumnNames, CreateExportChangesTableOptions options,
boolean parallelizationEnabled) throws SQLException {
List<String> columnNames =
getTableColumnNames(connection, tableToBeUpdated
, false // <- omitLobAndLongColumnsDuringImport
, OraOopConstants.Sqoop.Tool.EXPORT
, true // <- onlyOraOopSupportedTypes
, false // <- omitOraOopPseudoColumns
);
StringBuilder columnClause = new StringBuilder(2 * columnNames.size());
for (int idx = 0; idx < columnNames.size(); idx++) {
if (idx > 0) {
columnClause.append(",");
}
columnClause.append("a." + columnNames.get(idx));
}
StringBuilder rowEqualityClause = new StringBuilder();
for (int idx = 0; idx < columnNames.size(); idx++) {
String columnName = columnNames.get(idx);
// We need to omit the OraOop pseudo columns from the SQL statement that
// compares the data in
// the two tables we're interested in. Otherwise, EVERY row will be
// considered to be changed,
// since the values in the pseudo columns will differ. (i.e.
// ORAOOP_EXPORT_SYSDATE will differ.)
if (columnName
.equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_PARTITION)
|| columnName
.equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION)
|| columnName
.equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_MAPPER_ROW)) {
continue;
}
if (idx > 0) {
rowEqualityClause.append("OR");
}
rowEqualityClause.append(String.format("(a.%1$s <> b.%1$s "
+ "OR (a.%1$s IS NULL AND b.%1$s IS NOT NULL) "
+ "OR (a.%1$s IS NOT NULL AND b.%1$s IS NULL))", columnName));
}
String sqlJoin = null;
switch (options) {
case OnlyRowsThatDiffer:
sqlJoin = "";
break;
case RowsThatDifferPlusNewRows:
sqlJoin = "(+)"; // <- An outer-join will cause the "new" rows to be
// included
break;
default:
throw new RuntimeException(String.format(
"Update %s to cater for the option \"%s\".", OraOopUtilities
.getCurrentMethodName(), options.toString()));
}
String sql =
String.format("CREATE TABLE %1$s \n" + "NOLOGGING %8$s \n" + "%7$s \n"
+ "AS \n " + "SELECT \n" + "%5$s \n" + "FROM %2$s a, %3$s b \n"
+ "WHERE (%4$s) \n" + "AND ( \n" + "%6$s \n" + ")", tableToCreate
.toString(), tableContainingUpdates.toString(), tableToBeUpdated
.toString(), generateUpdateKeyColumnsWhereClauseFragment(
joinColumnNames, "a", "b", sqlJoin), columnClause.toString(),
rowEqualityClause.toString(), parallelizationEnabled ? "PARALLEL"
: "", tableToCreateStorageClause);
LOG.info(String.format("The SQL to create the changes-table is:\n%s", sql));
Statement statement = connection.createStatement();
long start = System.nanoTime();
statement.executeUpdate(sql);
double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
LOG.info(String.format("Time spent creating change-table: %f sec.",
timeInSec));
String indexName = tableToCreate.toString().replaceAll("CHG", "IDX");
start = System.nanoTime();
statement.execute(String.format("CREATE INDEX %s ON %s (%s)", indexName,
tableToCreate.toString(), OraOopUtilities
.stringArrayToCSV(joinColumnNames)));
timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
LOG.info(String.format("Time spent creating change-table index: %f sec.",
timeInSec));
int changeTableRowCount = 0;
ResultSet resultSet =
statement.executeQuery(String.format("select count(*) from %s",
tableToCreate.toString()));
resultSet.next();
changeTableRowCount = resultSet.getInt(1);
LOG.info(String.format("The number of rows in the change-table is: %d",
changeTableRowCount));
statement.close();
return changeTableRowCount;
}