in plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java [317:412]
public static List<String> genSplitSqlForOracle(String splitPK,
String table, String where, Configuration configuration,
int adviceNum) {
if (adviceNum < 1) {
throw new IllegalArgumentException(String.format(
"切分份数不能小于1. 此处:adviceNum=[%s].", adviceNum));
} else if (adviceNum == 1) {
return null;
}
String whereSql = String.format("%s IS NOT NULL", splitPK);
if (StringUtils.isNotBlank(where)) {
whereSql = String.format(" WHERE (%s) AND (%s) ", whereSql, where);
} else {
whereSql = String.format(" WHERE (%s) ", whereSql);
}
Double percentage = configuration.getDouble(Key.SAMPLE_PERCENTAGE, 0.1);
String sampleSqlTemplate = "SELECT * FROM ( SELECT %s FROM %s SAMPLE (%s) %s ORDER BY DBMS_RANDOM.VALUE) WHERE ROWNUM <= %s ORDER by %s ASC";
String splitSql = String.format(sampleSqlTemplate, splitPK, table,
percentage, whereSql, adviceNum, splitPK);
int fetchSize = configuration.getInt(Constant.FETCH_SIZE, 32);
String jdbcURL = configuration.getString(Key.JDBC_URL);
String username = configuration.getString(Key.USERNAME);
String password = configuration.getString(Key.PASSWORD);
Connection conn = DBUtil.getConnection(DATABASE_TYPE, jdbcURL,
username, password);
LOG.info("split pk [sql={}] is running... ", splitSql);
ResultSet rs = null;
List<Pair<Object, Integer>> splitedRange = new ArrayList<Pair<Object, Integer>>();
try {
try {
rs = DBUtil.query(conn, splitSql, fetchSize);
} catch (Exception e) {
throw RdbmsException.asQueryException(DATABASE_TYPE, e,
splitSql, table, username);
}
if (configuration != null) {
configuration
.set(Constant.PK_TYPE, Constant.PK_TYPE_MONTECARLO);
}
ResultSetMetaData rsMetaData = rs.getMetaData();
while (DBUtil.asyncResultSetNext(rs)) {
ImmutablePair<Object, Integer> eachPoint = new ImmutablePair<Object, Integer>(
rs.getObject(1), rsMetaData.getColumnType(1));
splitedRange.add(eachPoint);
}
} catch (DataXException e) {
throw e;
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.ILLEGAL_SPLIT_PK,
"DataX尝试切分表发生错误. 请检查您的配置并作出修改.", e);
} finally {
DBUtil.closeDBResources(rs, null, null);
}
LOG.debug(JSON.toJSONString(splitedRange));
List<String> rangeSql = new ArrayList<String>();
int splitedRangeSize = splitedRange.size();
// warn: splitedRangeSize may be 0 or 1,切分规则为IS NULL以及 IS NOT NULL
// demo: Parameter rangeResult can not be null and its length can not <2. detail:rangeResult=[24999930].
if (splitedRangeSize >= 2) {
// warn: oracle Number is long type here
if (isLongType(splitedRange.get(0).getRight())) {
BigInteger[] integerPoints = new BigInteger[splitedRange.size()];
for (int i = 0; i < splitedRangeSize; i++) {
integerPoints[i] = new BigInteger(splitedRange.get(i)
.getLeft().toString());
}
rangeSql.addAll(RdbmsRangeSplitWrap.wrapRange(integerPoints,
splitPK));
// its ok if splitedRangeSize is 1
rangeSql.add(RdbmsRangeSplitWrap.wrapFirstLastPoint(
integerPoints[0], integerPoints[splitedRangeSize - 1],
splitPK));
} else if (isStringType(splitedRange.get(0).getRight())) {
// warn: treated as string type
String[] stringPoints = new String[splitedRange.size()];
for (int i = 0; i < splitedRangeSize; i++) {
stringPoints[i] = new String(splitedRange.get(i).getLeft()
.toString());
}
rangeSql.addAll(RdbmsRangeSplitWrap.wrapRange(stringPoints,
splitPK, "'", DATABASE_TYPE));
// its ok if splitedRangeSize is 1
rangeSql.add(RdbmsRangeSplitWrap.wrapFirstLastPoint(
stringPoints[0], stringPoints[splitedRangeSize - 1],
splitPK, "'", DATABASE_TYPE));
} else {
throw DataXException
.asDataXException(
DBUtilErrorCode.ILLEGAL_SPLIT_PK,
"您配置的DataX切分主键(splitPk)有误. 因为您配置的切分主键(splitPk) 类型 DataX 不支持. DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系 DBA 进行处理.");
}
}
return rangeSql;
}