in odps-sqoop/src/java/org/apache/sqoop/tool/ImportTool.java [283:425]
private boolean initIncrementalConstraints(SqoopOptions options,
ImportJobContext context) throws ImportException, IOException {
// If this is an incremental import, determine the constraints
// to inject in the WHERE clause or $CONDITIONS for a query.
// Also modify the 'last value' field of the SqoopOptions to
// specify the current job start time / start row.
if (!isIncremental(options)) {
return true;
}
FileSystem fs = FileSystem.get(options.getConf());
SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
String nextIncrementalValue = null;
Object nextVal;
switch (incrementalMode) {
case AppendRows:
try {
nextVal = getMaxColumnId(options);
if (isDateTimeColumn(checkColumnType)) {
nextIncrementalValue = (nextVal == null) ? null
: manager.datetimeToQueryString(nextVal.toString(),
checkColumnType);
} else if (manager.isCharColumn(checkColumnType)) {
throw new ImportException("Character column "
+ "(" + options.getIncrementalTestColumn() + ") can not be used "
+ "to determine which rows to incrementally import.");
} else {
nextIncrementalValue = (nextVal == null) ? null : nextVal.toString();
}
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
break;
case DateLastModified:
if (options.getMergeKeyCol() == null && !options.isAppendMode()
&& fs.exists(getOutputPath(options, context.getTableName(), false))) {
throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
+ " is required when using --" + this.INCREMENT_TYPE_ARG
+ " lastmodified and the output directory exists.");
}
checkColumnType = manager.getColumnTypes(options.getTableName(),
options.getSqlQuery()).get(options.getIncrementalTestColumn());
nextVal = manager.getCurrentDbTimestamp();
if (null == nextVal) {
throw new IOException("Could not get current time from database");
}
nextIncrementalValue = manager.datetimeToQueryString(nextVal.toString(),
checkColumnType);
break;
default:
throw new ImportException("Undefined incremental import type: "
+ incrementalMode);
}
// Build the WHERE clause components that are used to import
// only this incremental section.
StringBuilder sb = new StringBuilder();
String prevEndpoint = options.getIncrementalLastValue();
if (isDateTimeColumn(checkColumnType) && null != prevEndpoint
&& !prevEndpoint.startsWith("\'") && !prevEndpoint.endsWith("\'")) {
// Incremental imports based on date/time should be 'quoted' in
// ANSI SQL. If the user didn't specify single-quotes, put them
// around, here.
prevEndpoint = manager.datetimeToQueryString(prevEndpoint,
checkColumnType);
}
String checkColName = manager.escapeColName(
options.getIncrementalTestColumn());
LOG.info("Incremental import based on column " + checkColName);
if (null != prevEndpoint) {
if (prevEndpoint.equals(nextIncrementalValue)) {
LOG.info("No new rows detected since last import.");
return false;
}
LOG.info("Lower bound value: " + prevEndpoint);
sb.append(checkColName);
switch (incrementalMode) {
case AppendRows:
sb.append(" > ");
break;
case DateLastModified:
sb.append(" >= ");
break;
default:
throw new ImportException("Undefined comparison");
}
sb.append(prevEndpoint);
sb.append(" AND ");
}
if (null != nextIncrementalValue) {
sb.append(checkColName);
switch (incrementalMode) {
case AppendRows:
sb.append(" <= ");
break;
case DateLastModified:
sb.append(" < ");
break;
default:
throw new ImportException("Undefined comparison");
}
sb.append(nextIncrementalValue);
} else {
sb.append(checkColName);
sb.append(" IS NULL ");
}
LOG.info("Upper bound value: " + nextIncrementalValue);
if (options.getTableName() != null) {
// Table based import
String prevWhereClause = options.getWhereClause();
if (null != prevWhereClause) {
sb.append(" AND (");
sb.append(prevWhereClause);
sb.append(")");
}
String newConstraints = sb.toString();
options.setWhereClause(newConstraints);
} else {
// Incremental based import
sb.append(" AND $CONDITIONS");
String newQuery = options.getSqlQuery().replace(
"$CONDITIONS", sb.toString());
options.setSqlQuery(newQuery);
}
// Save this state for next time.
SqoopOptions recordOptions = options.getParent();
if (null == recordOptions) {
recordOptions = options;
}
recordOptions.setIncrementalLastValue(
(nextVal == null) ? null : nextVal.toString());
return true;
}