public EnvelopeResponse query()

in src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java [111:202]


    public EnvelopeResponse<AsyncQueryResponse> query(@Valid @RequestBody final AsyncQuerySQLRequest sqlRequest)
            throws InterruptedException, IOException {
        aclEvaluate.checkProjectQueryPermission(sqlRequest.getProject());
        checkProjectName(sqlRequest.getProject());
        if (!FILE_ENCODING.contains(sqlRequest.getEncode().toLowerCase(Locale.ROOT))) {
            return new EnvelopeResponse<>(QueryErrorCode.ASYNC_QUERY_ILLEGAL_PARAM.toErrorCode().getString(),
                    new AsyncQueryResponse(sqlRequest.getQueryId(), AsyncQueryResponse.Status.FAILED, "Format "
                            + sqlRequest.getFormat() + " unsupported. Only " + FILE_FORMAT + " are supported"),
                    "");
        }
        if (!FILE_FORMAT.contains(sqlRequest.getFormat().toLowerCase(Locale.ROOT))) {
            return new EnvelopeResponse<>(QueryErrorCode.ASYNC_QUERY_ILLEGAL_PARAM.toErrorCode().getString(),
                    new AsyncQueryResponse(sqlRequest.getQueryId(), AsyncQueryResponse.Status.FAILED, "Format "
                            + sqlRequest.getFormat() + " unsupported. Only " + FILE_FORMAT + " are supported"),
                    "");
        }
        final AtomicReference<String> queryIdRef = new AtomicReference<>();
        final AtomicReference<String> exceptionHandle = new AtomicReference<>();
        final SecurityContext context = SecurityContextHolder.getContext();

        if (StringUtils.isEmpty(sqlRequest.getSeparator())) {
            sqlRequest.setSeparator(",");
        }
        AsyncQueryRequestLimits asyncQueryRequestLimits = null;
        if (NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue()) {
            asyncQueryRequestLimits = new AsyncQueryRequestLimits();
        }
        AsyncQueryRequestLimits finalAsyncQueryRequestLimits = asyncQueryRequestLimits;
        executorService.submit(Objects.requireNonNull(TtlRunnable.get(() -> {
            String format = sqlRequest.getFormat().toLowerCase(Locale.ROOT);
            String encode = sqlRequest.getEncode().toLowerCase(Locale.ROOT);
            SecurityContextHolder.setContext(context);

            SparderEnv.setSeparator(sqlRequest.getSeparator());

            QueryContext queryContext = QueryContext.current();
            sqlRequest.setQueryId(queryContext.getQueryId());
            queryContext.getQueryTagInfo().setAsyncQuery(true);
            queryContext.getQueryTagInfo().setFileFormat(format);
            queryContext.getQueryTagInfo().setFileEncode(encode);
            queryContext.getQueryTagInfo().setFileName(sqlRequest.getFileName());
            queryContext.getQueryTagInfo().setSeparator(sqlRequest.getSeparator());
            queryContext.getQueryTagInfo().setIncludeHeader(sqlRequest.isIncludeHeader());
            queryContext.setProject(sqlRequest.getProject());
            logger.info("Start a new async query with queryId: {}", queryContext.getQueryId());
            String queryId = queryContext.getQueryId();
            queryIdRef.set(queryId);
            try {
                asyncQueryService.saveQueryUsername(sqlRequest.getProject(), queryId);
                SQLResponse response = queryService.queryWithCache(sqlRequest);
                if (response.isException()) {
                    AsyncQueryUtil.createErrorFlag(sqlRequest.getProject(), queryContext.getQueryId(),
                            response.getExceptionMessage());
                    exceptionHandle.set(response.getExceptionMessage());
                }
            } catch (Exception e) {
                try {
                    logger.error("failed to run query {}", queryContext.getQueryId(), e);
                    AsyncQueryUtil.createErrorFlag(sqlRequest.getProject(), queryContext.getQueryId(), e.getMessage());
                    exceptionHandle.set(e.getMessage());
                } catch (Exception e1) {
                    exceptionHandle.set(exceptionHandle.get() + "\n" + e.getMessage());
                    throw new RuntimeException(e1);
                }
            } finally {
                if (finalAsyncQueryRequestLimits != null) {
                    finalAsyncQueryRequestLimits.close();
                }
                logger.info("Async query with queryId: {} end", queryContext.getQueryId());
                QueryContext.current().close();
            }
        })));

        while (queryIdRef.get() == null) {
            Thread.sleep(200);
        }

        switch (asyncQueryService.queryStatus(sqlRequest.getProject(), sqlRequest.getQueryId())) {
        case SUCCESS:
            return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
                    new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.SUCCESSFUL, "query success"),
                    "");
        case FAILED:
            return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
                    new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.FAILED, exceptionHandle.get()),
                    "");
        default:
            return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
                    new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.RUNNING, "query still running"),
                    "");
        }
    }