private void updateMainExportTableFromUniqueMapperTable()

in odps-sqoop/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatUpdate.java [242:363]


    private void updateMainExportTableFromUniqueMapperTable(
        TaskAttemptContext context, String[] mergeColumnNames)
        throws SQLException {

      String schema =
          context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_OWNER);
      String localTableName =
          context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_NAME);
      OracleTable targetTable = new OracleTable(schema, localTableName);

      Object sysDateTime = getJobSysDate(context);
      OracleTable changesTable =
          OraOopUtilities.generateExportTableMapperTableName(Integer
              .toString(this.mapperId)
              + "_CHG", sysDateTime, null);

      OraOopOracleQueries.CreateExportChangesTableOptions changesTableOptions;
      boolean parallelizationEnabled =
          OraOopUtilities.enableOracleParallelProcessingDuringExport(context
              .getConfiguration());

      ExportTableUpdateTechnique exportTableUpdateTechnique =
          getExportTableUpdateTechnique();
      switch (exportTableUpdateTechnique) {

        case ReInsertUpdatedRows:
        case UpdateSql:
          changesTableOptions =
              CreateExportChangesTableOptions.OnlyRowsThatDiffer;
          break;

        case ReInsertUpdatedRowsAndNewRows:
        case MergeSql:
          changesTableOptions =
              CreateExportChangesTableOptions.RowsThatDifferPlusNewRows;
          break;

        default:
          throw new RuntimeException(String.format(
              "Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
              OraOopUtilities.getCurrentMethodName(),
              exportTableUpdateTechnique.toString()));
      }

      String temporaryTableStorageClause =
          OraOopUtilities.getTemporaryTableStorageClause(context
              .getConfiguration());

      try {
        int changeTableRowCount =
            OraOopOracleQueries.createExportChangesTable(this.getConnection(),
                changesTable, temporaryTableStorageClause, this.oracleTable,
                targetTable, this.updateColumnNames, changesTableOptions,
                parallelizationEnabled);

        if (changeTableRowCount == 0) {
          LOG.debug(String.format(
              "The changes-table does not contain any rows. %s is now exiting.",
                  OraOopUtilities.getCurrentMethodName()));
          return;
        }

        switch (exportTableUpdateTechnique) {

          case ReInsertUpdatedRows:
          case ReInsertUpdatedRowsAndNewRows:

            OraOopOracleQueries.deleteRowsFromTable(this.getConnection(),
                targetTable, changesTable, this.updateColumnNames,
                parallelizationEnabled);

            OraOopOracleQueries.insertRowsIntoExportTable(this.getConnection(),
                targetTable, changesTable, sysDateTime, this.mapperId,
                parallelizationEnabled);
            break;

          case UpdateSql:

            long start = System.nanoTime();

            OraOopOracleQueries.updateTable(this.getConnection(), targetTable,
                changesTable, this.updateColumnNames, this
                    .getOracleTableColumns(), sysDateTime, this.mapperId,
                parallelizationEnabled);

            double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
            LOG.info(String.format("Time spent performing an update: %f sec.",
                timeInSec));
            break;

          case MergeSql:

            long mergeStart = System.nanoTime();

            OraOopOracleQueries.mergeTable(this.getConnection(), targetTable,
                changesTable, this.updateColumnNames, this
                    .getOracleTableColumns(), sysDateTime, this.mapperId,
                parallelizationEnabled);

            double mergeTimeInSec = (System.nanoTime() - mergeStart)
                / Math.pow(10, 9);
            LOG.info(String.format("Time spent performing a merge: %f sec.",
                mergeTimeInSec));

            break;

          default:
            throw new RuntimeException(
              String.format(
                "Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
                        OraOopUtilities.getCurrentMethodName(),
                        exportTableUpdateTechnique.toString()));
        }

        this.getConnection().commit();
      } catch (SQLException ex) {
        this.getConnection().rollback();
        throw ex;
      } finally {
        OraOopOracleQueries.dropTable(this.getConnection(), changesTable);
      }
    }