in src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java [111:202]
public EnvelopeResponse<AsyncQueryResponse> query(@Valid @RequestBody final AsyncQuerySQLRequest sqlRequest)
throws InterruptedException, IOException {
aclEvaluate.checkProjectQueryPermission(sqlRequest.getProject());
checkProjectName(sqlRequest.getProject());
if (!FILE_ENCODING.contains(sqlRequest.getEncode().toLowerCase(Locale.ROOT))) {
return new EnvelopeResponse<>(QueryErrorCode.ASYNC_QUERY_ILLEGAL_PARAM.toErrorCode().getString(),
new AsyncQueryResponse(sqlRequest.getQueryId(), AsyncQueryResponse.Status.FAILED, "Format "
+ sqlRequest.getFormat() + " unsupported. Only " + FILE_FORMAT + " are supported"),
"");
}
if (!FILE_FORMAT.contains(sqlRequest.getFormat().toLowerCase(Locale.ROOT))) {
return new EnvelopeResponse<>(QueryErrorCode.ASYNC_QUERY_ILLEGAL_PARAM.toErrorCode().getString(),
new AsyncQueryResponse(sqlRequest.getQueryId(), AsyncQueryResponse.Status.FAILED, "Format "
+ sqlRequest.getFormat() + " unsupported. Only " + FILE_FORMAT + " are supported"),
"");
}
final AtomicReference<String> queryIdRef = new AtomicReference<>();
final AtomicReference<String> exceptionHandle = new AtomicReference<>();
final SecurityContext context = SecurityContextHolder.getContext();
if (StringUtils.isEmpty(sqlRequest.getSeparator())) {
sqlRequest.setSeparator(",");
}
AsyncQueryRequestLimits asyncQueryRequestLimits = null;
if (NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue()) {
asyncQueryRequestLimits = new AsyncQueryRequestLimits();
}
AsyncQueryRequestLimits finalAsyncQueryRequestLimits = asyncQueryRequestLimits;
executorService.submit(Objects.requireNonNull(TtlRunnable.get(() -> {
String format = sqlRequest.getFormat().toLowerCase(Locale.ROOT);
String encode = sqlRequest.getEncode().toLowerCase(Locale.ROOT);
SecurityContextHolder.setContext(context);
SparderEnv.setSeparator(sqlRequest.getSeparator());
QueryContext queryContext = QueryContext.current();
sqlRequest.setQueryId(queryContext.getQueryId());
queryContext.getQueryTagInfo().setAsyncQuery(true);
queryContext.getQueryTagInfo().setFileFormat(format);
queryContext.getQueryTagInfo().setFileEncode(encode);
queryContext.getQueryTagInfo().setFileName(sqlRequest.getFileName());
queryContext.getQueryTagInfo().setSeparator(sqlRequest.getSeparator());
queryContext.getQueryTagInfo().setIncludeHeader(sqlRequest.isIncludeHeader());
queryContext.setProject(sqlRequest.getProject());
logger.info("Start a new async query with queryId: {}", queryContext.getQueryId());
String queryId = queryContext.getQueryId();
queryIdRef.set(queryId);
try {
asyncQueryService.saveQueryUsername(sqlRequest.getProject(), queryId);
SQLResponse response = queryService.queryWithCache(sqlRequest);
if (response.isException()) {
AsyncQueryUtil.createErrorFlag(sqlRequest.getProject(), queryContext.getQueryId(),
response.getExceptionMessage());
exceptionHandle.set(response.getExceptionMessage());
}
} catch (Exception e) {
try {
logger.error("failed to run query {}", queryContext.getQueryId(), e);
AsyncQueryUtil.createErrorFlag(sqlRequest.getProject(), queryContext.getQueryId(), e.getMessage());
exceptionHandle.set(e.getMessage());
} catch (Exception e1) {
exceptionHandle.set(exceptionHandle.get() + "\n" + e.getMessage());
throw new RuntimeException(e1);
}
} finally {
if (finalAsyncQueryRequestLimits != null) {
finalAsyncQueryRequestLimits.close();
}
logger.info("Async query with queryId: {} end", queryContext.getQueryId());
QueryContext.current().close();
}
})));
while (queryIdRef.get() == null) {
Thread.sleep(200);
}
switch (asyncQueryService.queryStatus(sqlRequest.getProject(), sqlRequest.getQueryId())) {
case SUCCESS:
return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.SUCCESSFUL, "query success"),
"");
case FAILED:
return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.FAILED, exceptionHandle.get()),
"");
default:
return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.RUNNING, "query still running"),
"");
}
}