public void persistResponse()

in src/main/java/org/opensearch/search/asynchronous/processor/AsynchronousSearchPostProcessor.java [104:174]


    public void persistResponse(AsynchronousSearchActiveContext asynchronousSearchContext,
                                AsynchronousSearchPersistenceModel persistenceModel) {
        // acquire all permits non-blocking
        asynchronousSearchContext.acquireAllContextPermits(ActionListener.wrap(releasable -> {
                    // check again after acquiring permit if the context has been deleted mean while
                    if (asynchronousSearchContext.shouldPersist() == false) {
                        logger.debug(
                                "Async search context [{}] has been closed while waiting to acquire permits for post processing",
                                asynchronousSearchContext.getAsynchronousSearchId());
                        releasable.close();
                        return;
                    }
                    logger.debug("Persisting response for asynchronous search id [{}]",
                            asynchronousSearchContext.getAsynchronousSearchId());
                    try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
                        asynchronousSearchPersistenceService.storeResponse(asynchronousSearchContext.getAsynchronousSearchId(),
                                persistenceModel, ActionListener.runAfter(ActionListener.wrap(
                                        (indexResponse) -> {
                                            //Mark any dangling reference as PERSISTED and cleaning it up from the IN_MEMORY context
                                            logger.debug("Successfully persisted response for asynchronous search id [{}]",
                                                    asynchronousSearchContext.getAsynchronousSearchId());
                                            try {
                                                asynchronousSearchStateMachine.trigger(new SearchResponsePersistedEvent(
                                                        asynchronousSearchContext));
                                            } catch (AsynchronousSearchStateMachineClosedException ex) {
                                                // this should never happen since we had checked after acquiring the all permits so a
                                                // concurrent delete is not expected here, however an external task cancellation
                                                // can cause this
                                                logger.warn("Unexpected state, possibly caused by external task cancellation," +
                                                                " context with id [{}] closed while triggering event [{}]",
                                                        asynchronousSearchContext.getAsynchronousSearchId(),
                                                        SearchResponsePersistedEvent.class.getName());
                                            } finally {
                                                freeActiveContextConsumer.accept(asynchronousSearchContext);
                                            }
                                        },

                                        (e) -> {
                                            try {
                                                asynchronousSearchStateMachine.trigger(new SearchResponsePersistFailedEvent(
                                                        asynchronousSearchContext));
                                            } catch (AsynchronousSearchStateMachineClosedException ex) {
                                                //this should never happen since we had checked after acquiring the all permits so a
                                                // concurrent delete is not expected here, however an external task cancellation
                                                // can cause this
                                                logger.warn("Unexpected state, possibly caused by external task cancellation," +
                                                                " context with id [{}] closed while triggering event [{}]",
                                                        asynchronousSearchContext.getAsynchronousSearchId(),
                                                        SearchResponsePersistFailedEvent.class.getName());
                                            } finally {
                                                freeActiveContextConsumer.accept(asynchronousSearchContext);
                                            }
                                            logger.error(() -> new ParameterizedMessage(
                                                    "Failed to persist final response for [{}] due to [{}]",
                                                    asynchronousSearchContext.getAsynchronousSearchId(), e));
                                        }
                                ), releasable::close));
                    }

                }, (e) -> {
                    // Failure to acquire context can happen either due to a TimeoutException or AsynchronousSearchAlreadyClosedException
                    // If we weren't able to acquire permits we clean up the context to release heap.
                    Throwable cause = ExceptionsHelper.unwrapCause(e);
                    Level level = cause instanceof AsynchronousSearchContextClosedException || cause instanceof TimeoutException
                            ? Level.DEBUG : Level.WARN;
                    logger.log(level, () -> new ParameterizedMessage("Exception  occured while acquiring the permit for " +
                            "asynchronousSearchContext [{}]", asynchronousSearchContext.getAsynchronousSearchId()), e);
                    freeActiveContextConsumer.accept(asynchronousSearchContext);
                }),
                TimeValue.timeValueSeconds(120), "persisting response");
    }