in odps-sqoop/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatUpdate.java [242:363]
private void updateMainExportTableFromUniqueMapperTable(
TaskAttemptContext context, String[] mergeColumnNames)
throws SQLException {
String schema =
context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_OWNER);
String localTableName =
context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_NAME);
OracleTable targetTable = new OracleTable(schema, localTableName);
Object sysDateTime = getJobSysDate(context);
OracleTable changesTable =
OraOopUtilities.generateExportTableMapperTableName(Integer
.toString(this.mapperId)
+ "_CHG", sysDateTime, null);
OraOopOracleQueries.CreateExportChangesTableOptions changesTableOptions;
boolean parallelizationEnabled =
OraOopUtilities.enableOracleParallelProcessingDuringExport(context
.getConfiguration());
ExportTableUpdateTechnique exportTableUpdateTechnique =
getExportTableUpdateTechnique();
switch (exportTableUpdateTechnique) {
case ReInsertUpdatedRows:
case UpdateSql:
changesTableOptions =
CreateExportChangesTableOptions.OnlyRowsThatDiffer;
break;
case ReInsertUpdatedRowsAndNewRows:
case MergeSql:
changesTableOptions =
CreateExportChangesTableOptions.RowsThatDifferPlusNewRows;
break;
default:
throw new RuntimeException(String.format(
"Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
OraOopUtilities.getCurrentMethodName(),
exportTableUpdateTechnique.toString()));
}
String temporaryTableStorageClause =
OraOopUtilities.getTemporaryTableStorageClause(context
.getConfiguration());
try {
int changeTableRowCount =
OraOopOracleQueries.createExportChangesTable(this.getConnection(),
changesTable, temporaryTableStorageClause, this.oracleTable,
targetTable, this.updateColumnNames, changesTableOptions,
parallelizationEnabled);
if (changeTableRowCount == 0) {
LOG.debug(String.format(
"The changes-table does not contain any rows. %s is now exiting.",
OraOopUtilities.getCurrentMethodName()));
return;
}
switch (exportTableUpdateTechnique) {
case ReInsertUpdatedRows:
case ReInsertUpdatedRowsAndNewRows:
OraOopOracleQueries.deleteRowsFromTable(this.getConnection(),
targetTable, changesTable, this.updateColumnNames,
parallelizationEnabled);
OraOopOracleQueries.insertRowsIntoExportTable(this.getConnection(),
targetTable, changesTable, sysDateTime, this.mapperId,
parallelizationEnabled);
break;
case UpdateSql:
long start = System.nanoTime();
OraOopOracleQueries.updateTable(this.getConnection(), targetTable,
changesTable, this.updateColumnNames, this
.getOracleTableColumns(), sysDateTime, this.mapperId,
parallelizationEnabled);
double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
LOG.info(String.format("Time spent performing an update: %f sec.",
timeInSec));
break;
case MergeSql:
long mergeStart = System.nanoTime();
OraOopOracleQueries.mergeTable(this.getConnection(), targetTable,
changesTable, this.updateColumnNames, this
.getOracleTableColumns(), sysDateTime, this.mapperId,
parallelizationEnabled);
double mergeTimeInSec = (System.nanoTime() - mergeStart)
/ Math.pow(10, 9);
LOG.info(String.format("Time spent performing a merge: %f sec.",
mergeTimeInSec));
break;
default:
throw new RuntimeException(
String.format(
"Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
OraOopUtilities.getCurrentMethodName(),
exportTableUpdateTechnique.toString()));
}
this.getConnection().commit();
} catch (SQLException ex) {
this.getConnection().rollback();
throw ex;
} finally {
OraOopOracleQueries.dropTable(this.getConnection(), changesTable);
}
}