protected void doExecute()

in src/main/java/org/opensearch/search/asynchronous/transport/TransportSubmitAsynchronousSearchAction.java [68:123]


    protected void doExecute(Task task, SubmitAsynchronousSearchRequest request, ActionListener<AsynchronousSearchResponse> listener) {
        AsynchronousSearchContext asynchronousSearchContext = null;
        String userStr = threadPool.getThreadContext().getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT);
        User user = User.parse(userStr);
        try {
            final long relativeStartTimeInMillis = threadPool.relativeTimeInMillis();
            asynchronousSearchContext = asynchronousSearchService.createAndStoreContext(request, relativeStartTimeInMillis,
                    () -> searchService.aggReduceContextBuilder(request.getSearchRequest()), user);
            assert asynchronousSearchContext.getAsynchronousSearchProgressListener() != null
                    : "missing progress listener for an active context";
            AsynchronousSearchProgressListener progressListener = asynchronousSearchContext.getAsynchronousSearchProgressListener();
            AsynchronousSearchContext context = asynchronousSearchContext; //making it effectively final for usage in anonymous class.
            SearchRequest searchRequest = new SearchRequest(request.getSearchRequest()) {
                @Override
                public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
                    AsynchronousSearchTask asynchronousSearchTask = new AsynchronousSearchTask(id, type, AsynchronousSearchTask.NAME,
                            parentTaskId, headers,
                            (AsynchronousSearchActiveContext) context, request, asynchronousSearchService::onCancelledFreeActiveContext);

                    asynchronousSearchService.bootstrapSearch(asynchronousSearchTask, context.getContextId());
                    PrioritizedActionListener<AsynchronousSearchResponse> wrappedListener = AsynchronousSearchTimeoutWrapper
                            .wrapScheduledTimeout(threadPool, request.getWaitForCompletionTimeout(),
                                    AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME, listener,
                                    (actionListener) -> { progressListener.searchProgressActionListener().removeListener(actionListener);
                                        listener.onResponse(context.getAsynchronousSearchResponse());
                                    });
                    progressListener.searchProgressActionListener().addOrExecuteListener(wrappedListener);
                    return asynchronousSearchTask;
                }
            };
            //set the parent task as the submit task for cancellation on connection close
            searchRequest.setParentTask(task.taskInfo(clusterService.localNode().getId(), false).getTaskId());
            transportSearchAction.execute(searchRequest, progressListener);

        } catch (Exception e) {
            logger.error(() -> new ParameterizedMessage("Failed to submit asynchronous search request [{}]", request), e);
            if (asynchronousSearchContext != null) {
                AsynchronousSearchActiveContext asynchronousSearchActiveContext = (AsynchronousSearchActiveContext)
                        asynchronousSearchContext;
                asynchronousSearchService.freeContext(asynchronousSearchActiveContext.getAsynchronousSearchId(),
                        asynchronousSearchActiveContext.getContextId(), user,
                        ActionListener.wrap((r) -> {
                            logger.debug(() -> new ParameterizedMessage("Successfully cleaned up context on submit asynchronous" +
                                    " search [{}] on failure", asynchronousSearchActiveContext.getAsynchronousSearchId()), e);
                            listener.onFailure(e);
                        }, (ex) -> {
                            logger.debug(() -> new ParameterizedMessage("Failed to cleaned up context on submit asynchronous search" +
                                    " [{}] on failure", asynchronousSearchActiveContext.getAsynchronousSearchId()), ex);
                            listener.onFailure(e);
                        })
                );
            } else {
                listener.onFailure(e);
            }
        }
    }