in odps-console-sql/src/main/java/com/aliyun/openservices/odps/console/QueryCommand.java [182:272]
public void run() throws OdpsException, ODPSConsoleException {
ExecutionContext context = getContext();
if ("true".equalsIgnoreCase(SetCommand.setMap.getOrDefault(PMC_TASK_CONSOLE_KEY, "false"))) {
context.setPMCMode(true);
} else {
context.setPMCMode(false);
}
// PMCTask always accompanied by triggerandwait command, thus async mode is supported by select
if ((!getContext().isPMCMode()) && (isSelectCommand && context.isAsyncMode())) {
getWriter().writeError("[async mode]: can't support select command.");
return;
}
if (context.isInteractiveMode()) {
//check input confirm, if no return directly.
if (!confirmSQLInput()) {
return;
}
}
boolean isDryRun = getContext().isDryRun();
DefaultOutputWriter writer = context.getOutputWriter();
// try retry
int retryTime = isSelectCommand ? 1 : context.getRetryTimes();
retryTime = retryTime > 0 ? retryTime : 1;
while (retryTime > 0) {
Task task = null;
try {
taskName = getTaskName(isDryRun);
if (isDryRun) {
task = new SqlPlanTask(taskName, getCommandText());
} else {
task = new SQLTask();
task.setName(taskName);
((SQLTask) task).setQuery(getCommandText());
}
HashMap<String, String> taskConfig = QueryUtil.getTaskConfig();
if (!getContext().isMachineReadable()) {
Map<String, String> settings = new HashMap<String, String>();
settings.put("odps.sql.select.output.format", "HumanReadable");
addSetting(taskConfig, settings);
}
for (Entry<String, String> property : taskConfig.entrySet()) {
task.setProperty(property.getKey(), property.getValue());
}
runJob(task);
// success
break;
} catch (UserInterruptException e) {
throw e;
} catch (Exception e) {
// 如果是insert动态分区的sql,且判断是否能够重试
if (task instanceof SQLTask && QueryUtil.isOperatorDisabled(((SQLTask) task).getQuery())) {
String errorMessage = e.getMessage();
// 两种情况不能够重试,
// 第一明确是ODPS-0110999,
// 第二种不包含ODPS-的未知异常,如excutor crash掉了,未知的网络问题
if (errorMessage.contains("ODPS-0110999")) {
// 如果是异常是ODPS-0110999不允许重试
throw new ODPSConsoleException(e.getMessage());
} else if (!errorMessage.contains("ODPS-")) {
// 如果不错误包含ODPS-语句,这是未知的异常
throw new ODPSConsoleException("ODPS-0110999:" + e.getMessage());
}
}
retryTime--;
if (retryTime == 0) {
throw new ODPSConsoleException(e.getMessage(), e);
}
writer.writeError("retry " + retryTime);
writer.writeDebug(StringUtils.stringifyException(e));
}
}
// 如果返回是空的,且不是 select 语句则打出OK
if (!isSelectCommand) {
writer.writeError("OK");
}
}