public void runExport()

in odps-sqoop/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java [343:480]


  public void runExport() throws ExportException, IOException {

    ConnManager cmgr = context.getConnManager();
    SqoopOptions options = context.getOptions();
    Configuration conf = options.getConf();

    String outputTableName = context.getTableName();
    String stagingTableName = context.getOptions().getStagingTableName();

    String tableName = outputTableName;
    boolean stagingEnabled = false;

    // Check if there are runtime error checks to do
    if (isHCatJob && options.isDirect()
        && !context.getConnManager().isDirectModeHCatSupported()) {
      throw new IOException("Direct import is not compatible with "
        + "HCatalog operations using the connection manager "
        + context.getConnManager().getClass().getName()
        + ". Please remove the parameter --direct");
    }
    if (options.getAccumuloTable() != null && options.isDirect()
        && !cmgr.isDirectModeAccumuloSupported()) {
      throw new IOException("Direct mode is incompatible with "
            + "Accumulo. Please remove the parameter --direct");
    }
    if (options.getHBaseTable() != null && options.isDirect()
        && !cmgr.isDirectModeHBaseSupported()) {
      throw new IOException("Direct mode is incompatible with "
            + "HBase. Please remove the parameter --direct");
    }
    if (stagingTableName != null) { // user has specified the staging table
      if (cmgr.supportsStagingForExport()) {
        LOG.info("Data will be staged in the table: " + stagingTableName);
        tableName = stagingTableName;
        stagingEnabled = true;
      } else {
        throw new ExportException("The active connection manager ("
            + cmgr.getClass().getCanonicalName()
            + ") does not support staging of data for export. "
            + "Please retry without specifying the --staging-table option.");
      }
    }


    String tableClassName = null;
    if (!cmgr.isORMFacilitySelfManaged()) {
        tableClassName =
            new TableClassName(options).getClassForTable(outputTableName);
    }
    // For ORM self managed, we leave the tableClassName to null so that
    // we don't check for non-existing classes.
    String ormJarFile = context.getJarFile();

    LOG.info("Beginning export of " + outputTableName);
    loadJars(conf, ormJarFile, tableClassName);

    if (stagingEnabled) {
      // Prepare the staging table
      if (options.doClearStagingTable()) {
        try {
          // Delete all records from staging table
          cmgr.deleteAllRecords(stagingTableName);
        } catch (SQLException ex) {
          throw new ExportException(
              "Failed to empty staging table before export run", ex);
        }
      } else {
        // User has not explicitly specified the clear staging table option.
        // Assert that the staging table is empty.
        try {
          long rowCount = cmgr.getTableRowCount(stagingTableName);
          if (rowCount != 0L) {
            throw new ExportException("The specified staging table ("
                + stagingTableName + ") is not empty. To force deletion of "
                + "its data, please retry with --clear-staging-table option.");
          }
        } catch (SQLException ex) {
          throw new ExportException(
              "Failed to count data rows in staging table: "
                  + stagingTableName, ex);
        }
      }
    }
    Job job = createJob(conf);
    try {
      // Set the external jar to use for the job.
      job.getConfiguration().set("mapred.jar", ormJarFile);
      if (options.getMapreduceJobName() != null) {
        job.setJobName(options.getMapreduceJobName());
      }

      propagateOptionsToJob(job);
      if (isHCatJob) {
        LOG.info("Configuring HCatalog for export job");
        SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
        hCatUtils.configureHCat(options, job, cmgr, tableName,
          job.getConfiguration());
      }
      configureInputFormat(job, tableName, tableClassName, null);
      configureOutputFormat(job, tableName, tableClassName);
      configureMapper(job, tableName, tableClassName);
      configureNumTasks(job);
      cacheJars(job, context.getConnManager());

      jobSetup(job);
      setJob(job);
      boolean success = runJob(job);
      if (!success) {
        throw new ExportException("Export job failed!");
      }

      if (options.isValidationEnabled()) {
        validateExport(tableName, conf, job);
      }
    } catch (InterruptedException ie) {
      throw new IOException(ie);
    } catch (ClassNotFoundException cnfe) {
      throw new IOException(cnfe);
    } finally {
      unloadJars();
      jobTeardown(job);
    }

    // Unstage the data if needed
    if (stagingEnabled) {
      // Migrate data from staging table to the output table
      try {
        LOG.info("Starting to migrate data from staging table to destination.");
        cmgr.migrateData(stagingTableName, outputTableName);
      } catch (SQLException ex) {
        LoggingUtils.logAll(LOG, "Failed to move data from staging table ("
          + stagingTableName + ") to target table ("
          + outputTableName + ")", ex);
        throw new ExportException(
            "Failed to move data from staging table", ex);
      }
    }
  }