in src/main/java/org/opensearch/search/asynchronous/transport/TransportAsynchronousSearchRoutingAction.java [81:162]
public abstract void handleRequest(AsynchronousSearchId asynchronousSearchId, Request request, ActionListener<Response> listener,
User user);
final class AsyncForwardAction extends AbstractRunnable {
private final ActionListener<Response> listener;
private final Request request;
private DiscoveryNode targetNode;
private AsynchronousSearchId asynchronousSearchId;
AsyncForwardAction(Request request, ActionListener<Response> listener) {
try {
this.asynchronousSearchId = AsynchronousSearchIdConverter.parseAsyncId(request.getId());
this.request = request;
this.listener = listener;
this.targetNode = clusterService.state().nodes().get(asynchronousSearchId.getNode());
} catch (IllegalArgumentException e) { // failure in parsing asynchronous search
logger.error(() -> new ParameterizedMessage("Failed to parse asynchronous search ID [{}]", request.getId()), e);
listener.onFailure(AsynchronousSearchExceptionUtils.buildResourceNotFoundException(request.getId()));
throw e;
}
}
@Override
public void onFailure(Exception e) {
logger.error(() -> new ParameterizedMessage(
"Failed to dispatch request for action [{}] for asynchronous search [{}]", actionName, request.getId()), e);
sendLocalRequest(asynchronousSearchId, request, listener);
}
@Override
protected void doRun() {
ClusterState state = clusterService.state();
// forward request only if the local node isn't the node coordinating the search and the node coordinating
// the search exists in the cluster
TransportRequestOptions requestOptions = TransportRequestOptions.builder().withTimeout(
asynchronousSearchService.getMaxWaitForCompletionTimeout()).build();
if (targetNode != null && state.nodes().getLocalNode().equals(targetNode) == false && state.nodes().nodeExists(targetNode)) {
logger.debug("Forwarding asynchronous search id [{}] request to target node [{}]", request.getId(), targetNode);
transportService.sendRequest(targetNode, actionName, request, requestOptions,
new ActionListenerResponseHandler<Response>(listener, responseReader) {
@Override
public void handleException(final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException ||
(exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) {
// we want to retry here a bit to see if the node connects backs
logger.debug("Connection exception while trying to forward request with id[{}] to " +
"target node [{}] Error: [{}]",
request.getId(), targetNode, exp.getDetailedMessage());
//try on local node since we weren't able to forward
sendLocalRequest(asynchronousSearchId, request, listener);
} else {
logger.debug("Exception received for request with id[{}] to from target node [{}], Error: [{}]",
request.getId(), targetNode, exp.getDetailedMessage());
listener.onFailure(cause instanceof Exception ? (Exception) cause
: new NotSerializableExceptionWrapper(cause));
}
}
@Override
public void handleResponse(Response response) {
logger.debug("Received the response for asynchronous search id [{}] from target node [{}]", request.getId(),
targetNode);
listener.onResponse(response);
}
});
} else {
sendLocalRequest(asynchronousSearchId, request, listener);
}
}
private void sendLocalRequest(AsynchronousSearchId asynchronousSearchId, Request request, ActionListener<Response> listener) {
ThreadContext threadContext = threadPool.getThreadContext();
String userStr = threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT);
User user = User.parse(userStr);
try (ThreadContext.StoredContext ctx = threadContext.stashContext()) {
handleRequest(asynchronousSearchId, request, listener, user);
}
}
}