public abstract void handleRequest()

in src/main/java/org/opensearch/search/asynchronous/transport/TransportAsynchronousSearchRoutingAction.java [81:162]


    public abstract void handleRequest(AsynchronousSearchId asynchronousSearchId, Request request, ActionListener<Response> listener,
                                       User user);

    final class AsyncForwardAction extends AbstractRunnable {

        private final ActionListener<Response> listener;
        private final Request request;
        private DiscoveryNode targetNode;
        private AsynchronousSearchId asynchronousSearchId;

        AsyncForwardAction(Request request, ActionListener<Response> listener) {
            try {
                this.asynchronousSearchId = AsynchronousSearchIdConverter.parseAsyncId(request.getId());

                this.request = request;
                this.listener = listener;
                this.targetNode = clusterService.state().nodes().get(asynchronousSearchId.getNode());
            } catch (IllegalArgumentException e) { // failure in parsing asynchronous search
                logger.error(() -> new ParameterizedMessage("Failed to parse asynchronous search ID [{}]", request.getId()), e);
                listener.onFailure(AsynchronousSearchExceptionUtils.buildResourceNotFoundException(request.getId()));
                throw e;
            }
        }

        @Override
        public void onFailure(Exception e) {
            logger.error(() -> new ParameterizedMessage(
                    "Failed to dispatch request for action [{}] for asynchronous search [{}]", actionName, request.getId()), e);
            sendLocalRequest(asynchronousSearchId, request, listener);
        }

        @Override
        protected void doRun() {
            ClusterState state = clusterService.state();
            // forward request only if the local node isn't the node coordinating the search and the node coordinating
            // the search exists in the cluster
            TransportRequestOptions requestOptions = TransportRequestOptions.builder().withTimeout(
                    asynchronousSearchService.getMaxWaitForCompletionTimeout()).build();
            if (targetNode != null && state.nodes().getLocalNode().equals(targetNode) == false && state.nodes().nodeExists(targetNode)) {
                logger.debug("Forwarding asynchronous search id [{}] request to target node [{}]", request.getId(), targetNode);
                transportService.sendRequest(targetNode, actionName, request, requestOptions,
                        new ActionListenerResponseHandler<Response>(listener, responseReader) {
                            @Override
                            public void handleException(final TransportException exp) {
                                Throwable cause = exp.unwrapCause();
                                if (cause instanceof ConnectTransportException ||
                                        (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) {
                                    // we want to retry here a bit to see if the node connects backs
                                    logger.debug("Connection exception while trying to forward request with id[{}] to " +
                                                    "target node [{}] Error: [{}]",
                                            request.getId(), targetNode, exp.getDetailedMessage());
                                    //try on local node since we weren't able to forward
                                    sendLocalRequest(asynchronousSearchId, request, listener);
                                } else {
                                    logger.debug("Exception received for request with id[{}] to from target node [{}],  Error: [{}]",
                                            request.getId(), targetNode, exp.getDetailedMessage());
                                    listener.onFailure(cause instanceof Exception ? (Exception) cause
                                            : new NotSerializableExceptionWrapper(cause));
                                }
                            }

                            @Override
                            public void handleResponse(Response response) {
                                logger.debug("Received the response for asynchronous search id [{}] from target node [{}]", request.getId(),
                                        targetNode);
                                listener.onResponse(response);
                            }
                        });
            } else {
                sendLocalRequest(asynchronousSearchId, request, listener);
            }
        }

        private void sendLocalRequest(AsynchronousSearchId asynchronousSearchId, Request request, ActionListener<Response> listener) {
            ThreadContext threadContext = threadPool.getThreadContext();
            String userStr = threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT);
            User user = User.parse(userStr);
            try (ThreadContext.StoredContext ctx = threadContext.stashContext()) {
                handleRequest(asynchronousSearchId, request, listener, user);
            }
        }
    }