in adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/AdsHelper.java [213:321]
public String loadData(String table, String partition, String sourcePath, boolean overwrite) throws AdsException {
if (table == null) {
throw new AdsException(AdsException.ADS_LOADDATA_TABLE_NULL, "ADS LOAD DATA table is null.", null);
}
if (sourcePath == null) {
throw new AdsException(AdsException.ADS_LOADDATA_SOURCEPATH_NULL, "ADS LOAD DATA source path is null.",
null);
}
if (adsURL == null) {
throw new AdsException(AdsException.ADS_CONN_URL_NOT_SET, "ADS JDBC connection URL was not set.", null);
}
if (userName == null) {
throw new AdsException(AdsException.ADS_CONN_USERNAME_NOT_SET,
"ADS JDBC connection user name was not set.", null);
}
if (password == null) {
throw new AdsException(AdsException.ADS_CONN_PASSWORD_NOT_SET, "ADS JDBC connection password was not set.",
null);
}
if (schema == null) {
throw new AdsException(AdsException.ADS_CONN_SCHEMA_NOT_SET, "ADS JDBC connection schema was not set.",
null);
}
StringBuilder sb = new StringBuilder();
sb.append("LOAD DATA FROM ");
if (sourcePath.startsWith("'") && sourcePath.endsWith("'")) {
sb.append(sourcePath);
} else {
sb.append("'" + sourcePath + "'");
}
if (overwrite) {
sb.append(" OVERWRITE");
}
sb.append(" INTO TABLE ");
sb.append(schema + "." + table);
if (partition != null && !partition.trim().equals("")) {
String partitionTrim = partition.trim();
if(partitionTrim.startsWith("(") && partitionTrim.endsWith(")")) {
sb.append(" PARTITION " + partition);
} else {
sb.append(" PARTITION " + "(" + partition + ")");
}
}
Connection connection = null;
Statement statement = null;
ResultSet rs = null;
try {
Class.forName("com.mysql.jdbc.Driver");
String url = AdsUtil.prepareJdbcUrl(this.adsURL, this.schema, this.socketTimeout, this.suffix);
Properties connectionProps = new Properties();
connectionProps.put("user", userName);
connectionProps.put("password", password);
connection = DriverManager.getConnection(url, connectionProps);
statement = connection.createStatement();
LOG.info("正在从ODPS数据库导数据到ADS中: "+sb.toString());
LOG.info("由于ADS的限制,ADS导数据最少需要20分钟,请耐心等待");
rs = statement.executeQuery(sb.toString());
String jobId = null;
while (DBUtil.asyncResultSetNext(rs)) {
jobId = rs.getString(1);
}
if (jobId == null) {
throw new AdsException(AdsException.ADS_LOADDATA_JOBID_NOT_AVAIL,
"Job id is not available for the submitted LOAD DATA." + jobId, null);
}
return jobId;
} catch (ClassNotFoundException e) {
throw new AdsException(AdsException.ADS_LOADDATA_FAILED, e.getMessage(), e);
} catch (SQLException e) {
throw new AdsException(AdsException.ADS_LOADDATA_FAILED, e.getMessage(), e);
} catch (Exception e) {
throw new AdsException(AdsException.ADS_LOADDATA_FAILED, e.getMessage(), e);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
// Ignore exception
}
}
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
// Ignore exception
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
// Ignore exception
}
}
}
}