in src/main/java/org/opensearch/search/asynchronous/transport/TransportSubmitAsynchronousSearchAction.java [68:123]
protected void doExecute(Task task, SubmitAsynchronousSearchRequest request, ActionListener<AsynchronousSearchResponse> listener) {
AsynchronousSearchContext asynchronousSearchContext = null;
String userStr = threadPool.getThreadContext().getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT);
User user = User.parse(userStr);
try {
final long relativeStartTimeInMillis = threadPool.relativeTimeInMillis();
asynchronousSearchContext = asynchronousSearchService.createAndStoreContext(request, relativeStartTimeInMillis,
() -> searchService.aggReduceContextBuilder(request.getSearchRequest()), user);
assert asynchronousSearchContext.getAsynchronousSearchProgressListener() != null
: "missing progress listener for an active context";
AsynchronousSearchProgressListener progressListener = asynchronousSearchContext.getAsynchronousSearchProgressListener();
AsynchronousSearchContext context = asynchronousSearchContext; //making it effectively final for usage in anonymous class.
SearchRequest searchRequest = new SearchRequest(request.getSearchRequest()) {
@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
AsynchronousSearchTask asynchronousSearchTask = new AsynchronousSearchTask(id, type, AsynchronousSearchTask.NAME,
parentTaskId, headers,
(AsynchronousSearchActiveContext) context, request, asynchronousSearchService::onCancelledFreeActiveContext);
asynchronousSearchService.bootstrapSearch(asynchronousSearchTask, context.getContextId());
PrioritizedActionListener<AsynchronousSearchResponse> wrappedListener = AsynchronousSearchTimeoutWrapper
.wrapScheduledTimeout(threadPool, request.getWaitForCompletionTimeout(),
AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME, listener,
(actionListener) -> { progressListener.searchProgressActionListener().removeListener(actionListener);
listener.onResponse(context.getAsynchronousSearchResponse());
});
progressListener.searchProgressActionListener().addOrExecuteListener(wrappedListener);
return asynchronousSearchTask;
}
};
//set the parent task as the submit task for cancellation on connection close
searchRequest.setParentTask(task.taskInfo(clusterService.localNode().getId(), false).getTaskId());
transportSearchAction.execute(searchRequest, progressListener);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to submit asynchronous search request [{}]", request), e);
if (asynchronousSearchContext != null) {
AsynchronousSearchActiveContext asynchronousSearchActiveContext = (AsynchronousSearchActiveContext)
asynchronousSearchContext;
asynchronousSearchService.freeContext(asynchronousSearchActiveContext.getAsynchronousSearchId(),
asynchronousSearchActiveContext.getContextId(), user,
ActionListener.wrap((r) -> {
logger.debug(() -> new ParameterizedMessage("Successfully cleaned up context on submit asynchronous" +
" search [{}] on failure", asynchronousSearchActiveContext.getAsynchronousSearchId()), e);
listener.onFailure(e);
}, (ex) -> {
logger.debug(() -> new ParameterizedMessage("Failed to cleaned up context on submit asynchronous search" +
" [{}] on failure", asynchronousSearchActiveContext.getAsynchronousSearchId()), ex);
listener.onFailure(e);
})
);
} else {
listener.onFailure(e);
}
}
}