public ConnManager accept()

in odps-sqoop/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java [67:272]


  public ConnManager accept(JobData jobData) {

    OraOopUtilities.enableDebugLoggingIfRequired(jobData.getSqoopOptions()
        .getConf());

    LOG.debug(String.format("%s can be called by Sqoop!",
        OraOopConstants.ORAOOP_PRODUCT_NAME));

    ConnManager result = null;

    if (jobData != null) {

      SqoopOptions sqoopOptions = jobData.getSqoopOptions();

      String connectString = sqoopOptions.getConnectString();
      if (connectString != null
          && connectString.toLowerCase().trim().startsWith("jdbc:oracle")) {

        if (!isOraOopEnabled(sqoopOptions)) {
          return result;
        }

        OraOopConnManager oraOopConnManager = null;

        OraOopConstants.Sqoop.Tool jobType = getSqoopJobType(jobData);
        OraOopUtilities.rememberSqoopJobType(jobType, jobData.getSqoopOptions()
            .getConf());

        List<OraOopLogMessage> messagesToDisplayAfterWelcome =
            new ArrayList<OraOopLogMessage>();

        switch (jobType) {

          case IMPORT:
            if (isNumberOfImportMappersOkay(sqoopOptions)
                && !isSqoopImportIncremental(jobData)
                && isSqoopImportJobTableBased(sqoopOptions)) {

              // At this stage, the Sqoop import job appears to be one we're
              // interested in accepting. We now need to connect to
              // the Oracle database to perform more tests...

              oraOopConnManager = new OraOopConnManager(sqoopOptions);

              try {
                Connection connection = oraOopConnManager.getConnection();

                if (isSqoopTableAnOracleTable(connection, sqoopOptions
                    .getUsername(),
                    oraOopConnManager.getOracleTableContext())) {

                  // OraOop will not accept responsibility for an Index
                  // Organized Table (IOT)...
                  if (!isSqoopTableAnIndexOrganizedTable(connection,
                      oraOopConnManager.getOracleTableContext())) {
                    result = oraOopConnManager; // <- OraOop accepts
                                                // responsibility for this Sqoop
                                                // job!
                  } else {
                    OraOopConstants.OraOopOracleDataChunkMethod method =
                        OraOopUtilities.getOraOopOracleDataChunkMethod(
                            sqoopOptions.getConf());
                    if (method == OraOopConstants.
                                      OraOopOracleDataChunkMethod.PARTITION) {
                      result = oraOopConnManager;
                    } else {
                      LOG.info(String.format("%s will not process this Sqoop"
                         + " connection, as the Oracle table %s is an"
                         + " index-organized table. If the table is"
                         + " partitioned, set "
                         + OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD
                         + " to "
                         + OraOopConstants.OraOopOracleDataChunkMethod.PARTITION
                         + ".",
                         OraOopConstants.ORAOOP_PRODUCT_NAME,
                         oraOopConnManager.getOracleTableContext().toString()));
                    }
                  }
                }
              } catch (SQLException ex) {
                throw new RuntimeException(String.format(
                    "Unable to connect to the Oracle database at %s\n"
                        + "Error:%s", sqoopOptions.getConnectString(), ex
                        .getMessage()), ex);
              }
            }
            break;

          case EXPORT:
            if (isNumberOfExportMappersOkay(sqoopOptions)) {

              // At this stage, the Sqoop export job appears to be one we're
              // interested in accepting. We now need to connect to
              // the Oracle database to perform more tests...

              oraOopConnManager = new OraOopConnManager(sqoopOptions);

              Connection connection = null;
              try {
                connection = oraOopConnManager.getConnection();
              } catch (SQLException ex) {
                throw new RuntimeException(String.format(
                    "Unable to connect to the Oracle database at %s\n"
                        + "Error:%s", sqoopOptions.getConnectString(), ex
                        .getMessage()), ex);
              }

              try {

                createAnyRequiredOracleObjects(sqoopOptions, connection,
                    oraOopConnManager, messagesToDisplayAfterWelcome);

                if (isSqoopTableAnOracleTable(connection, sqoopOptions
                    .getUsername(),
                    oraOopConnManager.getOracleTableContext())) {

                  result = oraOopConnManager; // <- OraOop accepts
                                              // responsibility for this Sqoop
                                              // job!
                }

              } catch (SQLException ex) {
                LOG.error(OraOopUtilities.getFullExceptionMessage(ex));
              }
            }

            break;
          default:
            // OraOop doesn't know how to handle other types of jobs - so won't
            // accept them.
            break;
        }

        // If OraOop has accepted this Sqoop job...
        if (result != null) {

          showUserTheOraOopWelcomeMessage();

          for (OraOopLogMessage message : messagesToDisplayAfterWelcome) {
            message.log(LOG);
          }

          // By the time we get into getSplits(), the number of mappers
          // stored in the config can be either 4 or 1 - so it seems
          // a bit unreliable. We'll use our own property name to ensure
          // getSplits() gets the correct value...
          sqoopOptions.getConf().setInt(
              OraOopConstants.ORAOOP_DESIRED_NUMBER_OF_MAPPERS,
              sqoopOptions.getNumMappers());

          // Generate the "action" name that we'll assign to our Oracle sessions
          // so that the user knows which Oracle sessions belong to OraOop...
          sqoopOptions.getConf().set(
              OraOopConstants.ORACLE_SESSION_ACTION_NAME,
              getOracleSessionActionName(jobData));

          OraOopUtilities.appendJavaSecurityEgd(sqoopOptions.getConf());

          // Get the Oracle database version...
          try {
            OracleVersion oracleVersion =
                OraOopOracleQueries.getOracleVersion(result.getConnection());
            LOG.info(String.format("Oracle Database version: %s",
                oracleVersion.getBanner()));
            sqoopOptions.getConf().setInt(
                OraOopConstants.ORAOOP_ORACLE_DATABASE_VERSION_MAJOR,
                oracleVersion.getMajor());
            sqoopOptions.getConf().setInt(
                OraOopConstants.ORAOOP_ORACLE_DATABASE_VERSION_MINOR,
                oracleVersion.getMinor());
          } catch (SQLException ex) {
            LOG.error("Unable to obtain the Oracle database version.", ex);
          }

          try {
            if (sqoopOptions.getConf().getBoolean(
                OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ, false)) {
              long scn =
                  sqoopOptions.getConf().getLong(
                      OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ_SCN, 0);
              if (scn == 0) {
                scn = OraOopOracleQueries.getCurrentScn(result.getConnection());
              }
              sqoopOptions.getConf().setLong(
                  OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ_SCN, scn);
              LOG.info("Performing a consistent read using SCN: " + scn);
            }
          } catch (SQLException ex) {
            throw new RuntimeException("Unable to determine SCN of database.",
                ex);
          }

          // Generate the JDBC URLs to be used by each mapper...
          setMapperConnectionDetails(oraOopConnManager, jobData);

          // Show the user the Oracle command that can be used to kill this
          // OraOop
          // job via Oracle...
          showUserTheOracleCommandToKillOraOop(sqoopOptions);
        }

      }
    }

    return result;
  }