private boolean initIncrementalConstraints()

in odps-sqoop/src/java/org/apache/sqoop/tool/ImportTool.java [283:425]


  private boolean initIncrementalConstraints(SqoopOptions options,
      ImportJobContext context) throws ImportException, IOException {

    // If this is an incremental import, determine the constraints
    // to inject in the WHERE clause or $CONDITIONS for a query.
    // Also modify the 'last value' field of the SqoopOptions to
    // specify the current job start time / start row.

    if (!isIncremental(options)) {
      return true;
    }

    FileSystem fs = FileSystem.get(options.getConf());
    SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
    String nextIncrementalValue = null;

    Object nextVal;
    switch (incrementalMode) {
    case AppendRows:
      try {
        nextVal = getMaxColumnId(options);
        if (isDateTimeColumn(checkColumnType)) {
          nextIncrementalValue = (nextVal == null) ? null
            : manager.datetimeToQueryString(nextVal.toString(),
                                            checkColumnType);
        } else if (manager.isCharColumn(checkColumnType)) {
          throw new ImportException("Character column "
            + "(" + options.getIncrementalTestColumn() + ") can not be used "
            + "to determine which rows to incrementally import.");
        } else {
          nextIncrementalValue = (nextVal == null) ? null : nextVal.toString();
        }
      } catch (SQLException sqlE) {
        throw new IOException(sqlE);
      }
      break;
    case DateLastModified:
      if (options.getMergeKeyCol() == null && !options.isAppendMode()
          && fs.exists(getOutputPath(options, context.getTableName(), false))) {
        throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
          + " is required when using --" + this.INCREMENT_TYPE_ARG
          + " lastmodified and the output directory exists.");
      }
      checkColumnType = manager.getColumnTypes(options.getTableName(),
        options.getSqlQuery()).get(options.getIncrementalTestColumn());
      nextVal = manager.getCurrentDbTimestamp();
      if (null == nextVal) {
        throw new IOException("Could not get current time from database");
      }
      nextIncrementalValue = manager.datetimeToQueryString(nextVal.toString(),
          checkColumnType);
      break;
    default:
      throw new ImportException("Undefined incremental import type: "
          + incrementalMode);
    }

    // Build the WHERE clause components that are used to import
    // only this incremental section.
    StringBuilder sb = new StringBuilder();
    String prevEndpoint = options.getIncrementalLastValue();

    if (isDateTimeColumn(checkColumnType) && null != prevEndpoint
        && !prevEndpoint.startsWith("\'") && !prevEndpoint.endsWith("\'")) {
      // Incremental imports based on date/time should be 'quoted' in
      // ANSI SQL. If the user didn't specify single-quotes, put them
      // around, here.
      prevEndpoint = manager.datetimeToQueryString(prevEndpoint,
          checkColumnType);
    }

    String checkColName = manager.escapeColName(
        options.getIncrementalTestColumn());
    LOG.info("Incremental import based on column " + checkColName);
    if (null != prevEndpoint) {
      if (prevEndpoint.equals(nextIncrementalValue)) {
        LOG.info("No new rows detected since last import.");
        return false;
      }
      LOG.info("Lower bound value: " + prevEndpoint);
      sb.append(checkColName);
      switch (incrementalMode) {
      case AppendRows:
        sb.append(" > ");
        break;
      case DateLastModified:
        sb.append(" >= ");
        break;
      default:
        throw new ImportException("Undefined comparison");
      }
      sb.append(prevEndpoint);
      sb.append(" AND ");
    }

    if (null != nextIncrementalValue) {
      sb.append(checkColName);
      switch (incrementalMode) {
      case AppendRows:
        sb.append(" <= ");
        break;
      case DateLastModified:
        sb.append(" < ");
        break;
      default:
        throw new ImportException("Undefined comparison");
      }
      sb.append(nextIncrementalValue);
    } else {
      sb.append(checkColName);
      sb.append(" IS NULL ");
    }

    LOG.info("Upper bound value: " + nextIncrementalValue);

    if (options.getTableName() != null) {
      // Table based import
      String prevWhereClause = options.getWhereClause();
      if (null != prevWhereClause) {
        sb.append(" AND (");
        sb.append(prevWhereClause);
        sb.append(")");
      }

      String newConstraints = sb.toString();
      options.setWhereClause(newConstraints);
    } else {
      // Incremental based import
      sb.append(" AND $CONDITIONS");
      String newQuery = options.getSqlQuery().replace(
        "$CONDITIONS", sb.toString());
      options.setSqlQuery(newQuery);
    }
    // Save this state for next time.
    SqoopOptions recordOptions = options.getParent();
    if (null == recordOptions) {
      recordOptions = options;
    }
    recordOptions.setIncrementalLastValue(
        (nextVal == null) ? null : nextVal.toString());

    return true;
  }