in src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java [748:837]
private void runSQL(String sql, Properties properties, boolean isUpdate) throws SQLException {
SQLExecutor executor = sqlExecutor;
try {
// If the client forget to end with a semi-colon, append it.
if (!sql.trim().endsWith(";")) {
sql += ";";
}
if (sql.length() > 4000) {
connHandle.log.warn("The length of sql is too long, it may cause performance issues. SQL length: " + sql.length());
}
Map<String, String> settings = new HashMap<>();
for (String key : sqlTaskProperties.stringPropertyNames()) {
settings.put(key, sqlTaskProperties.getProperty(key));
}
inputProperties = new Properties();
if (properties != null && !properties.isEmpty()) {
for (String key : properties.stringPropertyNames()) {
settings.put(key, properties.getProperty(key));
inputProperties.put(key, properties.getProperty(key));
}
}
if (!settings.isEmpty()) {
connHandle.log.info("Enabled SQL task properties: " + settings);
}
long begin = System.currentTimeMillis();
if (queryTimeout != -1 && !settings.containsKey("odps.sql.session.query.timeout")) {
settings.put("odps.sql.session.query.timeout", String.valueOf(queryTimeout));
}
Long autoSelectLimit = connHandle.getAutoSelectLimit();
if (autoSelectLimit != null && autoSelectLimit > 0) {
settings.put("odps.sql.select.auto.limit", autoSelectLimit.toString());
}
connHandle.log.info("Run SQL: " + sql + ", Begin time: " + begin);
executor.run(sql, settings);
logviewUrl = executor.getLogView();
connHandle.log.info("LogView: " + logviewUrl);
executeInstance = executor.getInstance();
if (executeInstance != null) {
connHandle.log.info("InstanceId: " + executeInstance.getId());
}
if (isUpdate) {
if (executeInstance != null) {
executeInstance.waitForSuccess();
Instance.TaskSummary taskSummary = null;
try {
taskSummary = executeInstance.getTaskSummary(JDBC_SQL_OFFLINE_TASK_NAME);
} catch (OdpsException e) {
// update count become uncertain here
connHandle.log.warn(
"Failed to get TaskSummary: instance_id=" + executeInstance.getId() + ", taskname="
+ JDBC_SQL_OFFLINE_TASK_NAME);
}
if (taskSummary != null) {
updateCount = Utils.getSinkCountFromTaskSummary(
StringEscapeUtils.unescapeJava(taskSummary.getJsonSummary()));
connHandle.log.debug("successfully updated " + updateCount + " records");
} else {
connHandle.log.warn("task summary is empty");
}
}
// 如果是DML或者DDL,即使有结果也视为无结果
odpsResultSet = null;
} else {
setResultSetInternal();
}
long end = System.currentTimeMillis();
if (connHandle.getLongJobWarningThreshold() != -1 && (end - begin) > connHandle.getLongJobWarningThreshold()) {
connHandle.log.warn("SQL execution time exceeds long job warning threshold. Execution time: " + (end - begin)
+ (executeInstance == null ? "" : (", InstanceId: " + executeInstance.getId())));
}
if (executeInstance != null) {
connHandle.log.info("It took me " + (end - begin) + " ms to run sql, instanceId: "
+ executeInstance.getId());
} else {
connHandle.log.info("It took me " + (end - begin) + " ms to run sql");
}
List<String> exeLog = executor.getExecutionLog();
if (!exeLog.isEmpty()) {
for (String log : exeLog) {
connHandle.log.info("Session execution log: " + log);
}
}
} catch (OdpsException | IOException e) {
throwSQLException(e, sql, executor.getInstance(), executor.getLogView());
}
}